From 2a50b6bf048517aa1f63c4e29c3637f09c148fdb Mon Sep 17 00:00:00 2001 From: Mikhail Pershin Date: Thu, 27 Oct 2011 08:03:33 +0400 Subject: [PATCH] LU-795 osd api: Commit callback per transaction - ability to add commit callback per transaction in addition to per-device hooks. Now it is much simpler if only commit callback is needed. - rewrite commit callbacks for last_commit and new_client, add commit callback in seq manager - cleanup not-needed code: old commit callbacks, txn_keys - remove osd od_env_for_commit environment and env param from commit callbacks - use th_sync to mark sync operations Change-Id: If5f8f2a6d3cd2f3e77fd13c802213a181043a2d7 Signed-off-by: Mikhail Pershin Reviewed-on: http://review.whamcloud.com/1621 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Alex Zhuravlev Reviewed-by: Oleg Drokin --- lustre/fid/fid_handler.c | 17 +----- lustre/fid/fid_store.c | 75 ++++++++++++++--------- lustre/include/dt_object.h | 44 ++++++++++---- lustre/include/lu_target.h | 5 +- lustre/include/lustre_fid.h | 4 +- lustre/mdd/mdd_device.c | 2 +- lustre/mdd/mdd_internal.h | 9 ++- lustre/mdd/mdd_object.c | 14 +++-- lustre/mdd/mdd_trans.c | 21 ++----- lustre/mdt/mdt_handler.c | 11 +--- lustre/mdt/mdt_internal.h | 26 -------- lustre/mdt/mdt_recovery.c | 63 +++++++------------ lustre/obdclass/dt_object.c | 17 ++---- lustre/osd-ldiskfs/osd_handler.c | 64 ++++++++++--------- lustre/osd-ldiskfs/osd_internal.h | 8 --- lustre/ptlrpc/target.c | 125 +++++++++++++++++++++++++++++++------- 16 files changed, 269 insertions(+), 236 deletions(-) diff --git a/lustre/fid/fid_handler.c b/lustre/fid/fid_handler.c index 2a33c43..a762edb 100644 --- a/lustre/fid/fid_handler.c +++ b/lustre/fid/fid_handler.c @@ -171,7 +171,6 @@ static int __seq_set_init(const struct lu_env *env, range_alloc(&seq->lss_hiwater_set, space, seq->lss_set_width); rc = seq_store_update(env, seq, NULL, 1); - seq->lss_set_transno = 0; return rc; } @@ -209,31 +208,21 @@ static int range_alloc_set(const struct lu_env *env, if (range_is_exhausted(loset)) { /* reached high water mark. */ struct lu_device *dev = seq->lss_site->ms_lu.ls_top_dev; - struct lu_target *tg = dev->ld_obd->u.obt.obt_lut; int obd_num_clients = dev->ld_obd->obd_num_exports; __u64 set_sz; - int sync = 0; /* calculate new seq width based on number of clients */ set_sz = max(seq->lss_set_width, - obd_num_clients * seq->lss_width); + obd_num_clients * seq->lss_width); set_sz = min(range_space(space), set_sz); /* Switch to hiwater range now */ - loset = hiset; + *loset = *hiset; /* allocate new hiwater range */ range_alloc(hiset, space, set_sz); - if (seq->lss_set_transno > dev->ld_obd->obd_last_committed) - sync = 1; - /* update ondisk seq with new *space */ - rc = seq_store_update(env, seq, NULL, sync); - - /* set new hiwater transno */ - cfs_spin_lock(&tg->lut_translock); - seq->lss_set_transno = tg->lut_last_transno; - cfs_spin_unlock(&tg->lut_translock); + rc = seq_store_update(env, seq, NULL, seq->lss_need_sync); } LASSERTF(!range_is_exhausted(loset) || range_is_sane(loset), diff --git a/lustre/fid/fid_store.c b/lustre/fid/fid_store.c index 2226d5e..66ce462 100644 --- a/lustre/fid/fid_store.c +++ b/lustre/fid/fid_store.c @@ -73,37 +73,37 @@ static struct lu_buf *seq_store_buf(struct seq_thread_info *info) return buf; } -struct thandle *seq_store_trans_start(struct lu_server_seq *seq, - const struct lu_env *env, int credit, - int sync) -{ - struct seq_thread_info *info; - struct dt_device *dt_dev; - struct thandle *th; - ENTRY; +struct seq_update_callback { + struct dt_txn_commit_cb suc_cb; + struct lu_server_seq *suc_seq; +}; - dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev); - info = lu_context_key_get(&env->le_ctx, &seq_thread_key); - LASSERT(info != NULL); - - txn_param_init(&info->sti_txn, credit); - if (sync) - txn_param_sync(&info->sti_txn); - - th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &info->sti_txn); - return th; +void seq_update_cb(struct lu_env *env, struct thandle *th, + struct dt_txn_commit_cb *cb, int err) +{ + struct seq_update_callback *ccb; + ccb = container_of0(cb, struct seq_update_callback, suc_cb); + ccb->suc_seq->lss_need_sync = 0; + cfs_list_del(&ccb->suc_cb.dcb_linkage); + OBD_FREE_PTR(ccb); } -void seq_store_trans_stop(struct lu_server_seq *seq, - const struct lu_env *env, - struct thandle *th) +int seq_update_cb_add(struct thandle *th, struct lu_server_seq *seq) { - struct dt_device *dt_dev; - ENTRY; - - dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev); - - dt_dev->dd_ops->dt_trans_stop(env, th); + struct seq_update_callback *ccb; + int rc; + OBD_ALLOC_PTR(ccb); + if (ccb == NULL) + return -ENOMEM; + + ccb->suc_cb.dcb_func = seq_update_cb; + CFS_INIT_LIST_HEAD(&ccb->suc_cb.dcb_linkage); + ccb->suc_seq = seq; + seq->lss_need_sync = 1; + rc = dt_trans_cb_add(th, &ccb->suc_cb); + if (rc) + OBD_FREE_PTR(ccb); + return rc; } /* This function implies that caller takes care about locking. */ @@ -143,14 +143,21 @@ int seq_store_write(struct lu_server_seq *seq, int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq, struct lu_seq_range *out, int sync) { + struct seq_thread_info *info; + struct dt_device *dt_dev; struct thandle *th; int rc; int credits = SEQ_TXN_STORE_CREDITS; + ENTRY; + + dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev); + info = lu_context_key_get(&env->le_ctx, &seq_thread_key); if (out != NULL) credits += FLD_TXN_INDEX_INSERT_CREDITS; - th = seq_store_trans_start(seq, env, credits, sync); + txn_param_init(&info->sti_txn, credits); + th = dt_trans_start(env, dt_dev, &info->sti_txn); if (IS_ERR(th)) RETURN(PTR_ERR(th)); @@ -158,16 +165,26 @@ int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq, if (rc) { CERROR("%s: Can't write space data, rc %d\n", seq->lss_name, rc); + GOTO(out,rc); } else if (out != NULL) { rc = fld_server_create(seq->lss_site->ms_server_fld, env, out, th); if (rc) { CERROR("%s: Can't Update fld database, rc %d\n", seq->lss_name, rc); + GOTO(out,rc); } } - seq_store_trans_stop(seq, env, th); + /* next sequence update will need sync until this update is committed + * in case of sync operation this is not needed obviously */ + if (!sync) + /* if callback can't be added then sync always */ + sync = !!seq_update_cb_add(th, seq); + + th->th_sync |= sync; +out: + dt_trans_stop(env, dt_dev, th); return rc; } diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index d02d664..39fde4b 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -84,6 +84,21 @@ struct dt_device_param { }; /** + * Per-transaction commit callback function + */ +struct dt_txn_commit_cb; +typedef void (*dt_cb_t)(struct lu_env *env, struct thandle *th, + struct dt_txn_commit_cb *cb, int err); +/** + * Special per-transaction callback for cases when just commit callback + * is needed and per-device callback are not convenient to use + */ +struct dt_txn_commit_cb { + cfs_list_t dcb_linkage; + dt_cb_t dcb_func; +}; + +/** * Basic transaction credit op */ enum dt_txn_op { @@ -123,6 +138,11 @@ struct dt_device_operations { void (*dt_trans_stop)(const struct lu_env *env, struct thandle *th); /** + * Add commit callback to the transaction. + */ + int (*dt_trans_cb_add)(struct thandle *th, + struct dt_txn_commit_cb *dcb); + /** * Return fid of root index object. */ int (*dt_root_get)(const struct lu_env *env, @@ -538,8 +558,6 @@ static inline int dt_object_exists(const struct dt_object *dt) struct txn_param { /** number of blocks this transaction will modify */ unsigned int tp_credits; - /** sync transaction is needed */ - __u32 tp_sync:1; }; static inline void txn_param_init(struct txn_param *p, unsigned int credits) @@ -554,11 +572,6 @@ static inline void txn_param_credit_add(struct txn_param *p, p->tp_credits += credits; } -static inline void txn_param_sync(struct txn_param *p) -{ - p->tp_sync = 1; -} - /** * This is the general purpose transaction handle. * 1. Transaction Life Cycle @@ -583,6 +596,8 @@ struct thandle { /** the last operation result in this transaction. * this value is used in recovery */ __s32 th_result; + /** whether we need sync commit */ + int th_sync; }; /** @@ -601,8 +616,7 @@ struct dt_txn_callback { struct txn_param *param, void *cookie); int (*dtc_txn_stop)(const struct lu_env *env, struct thandle *txn, void *cookie); - int (*dtc_txn_commit)(const struct lu_env *env, - struct thandle *txn, void *cookie); + void (*dtc_txn_commit)(struct thandle *txn, void *cookie); void *dtc_cookie; __u32 dtc_tag; cfs_list_t dtc_linkage; @@ -614,7 +628,7 @@ void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb); int dt_txn_hook_start(const struct lu_env *env, struct dt_device *dev, struct txn_param *param); int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn); -int dt_txn_hook_commit(const struct lu_env *env, struct thandle *txn); +void dt_txn_hook_commit(struct thandle *txn); int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj); @@ -671,11 +685,17 @@ static inline struct thandle *dt_trans_start(const struct lu_env *env, } static inline void dt_trans_stop(const struct lu_env *env, - struct dt_device *d, - struct thandle *th) + struct dt_device *d, struct thandle *th) { LASSERT(d->dd_ops->dt_trans_stop); return d->dd_ops->dt_trans_stop(env, th); } + +static inline int dt_trans_cb_add(struct thandle *th, + struct dt_txn_commit_cb *dcb) +{ + LASSERT(th->th_dev->dd_ops->dt_trans_cb_add); + return th->th_dev->dd_ops->dt_trans_cb_add(th, dcb); +} /** @} dt */ #endif /* __LUSTRE_DT_OBJECT_H */ diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 1ec6e66..59fd19c 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -67,8 +67,9 @@ struct lut_commit_cb { }; void lut_boot_epoch_update(struct lu_target *); -void lut_cb_last_committed(struct lu_target *, __u64, void *, int); -void lut_cb_client(struct lu_target *, __u64, void *, int); +int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut, + struct obd_export *exp, __u64 transno); +int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp); int lut_init(const struct lu_env *, struct lu_target *, struct obd_device *, struct dt_device *); void lut_fini(const struct lu_env *, struct lu_target *); diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h index 6894735..adb2671 100644 --- a/lustre/include/lustre_fid.h +++ b/lustre/include/lustre_fid.h @@ -218,8 +218,8 @@ struct lu_server_seq { */ __u64 lss_set_width; - /* transaction no of seq update write operation */ - __u64 lss_set_transno; + /* sync is needed for update operation */ + __u32 lss_need_sync; /** * Pointer to site object, required to access site fld. */ diff --git a/lustre/mdd/mdd_device.c b/lustre/mdd/mdd_device.c index 5f3a492..876a51a 100644 --- a/lustre/mdd/mdd_device.c +++ b/lustre/mdd/mdd_device.c @@ -93,7 +93,7 @@ static int mdd_device_init(const struct lu_env *env, struct lu_device *d, /* Prepare transactions callbacks. */ mdd->mdd_txn_cb.dtc_txn_start = mdd_txn_start_cb; mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb; - mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb; + mdd->mdd_txn_cb.dtc_txn_commit = NULL; mdd->mdd_txn_cb.dtc_cookie = mdd; mdd->mdd_txn_cb.dtc_tag = LCT_MD_THREAD; CFS_INIT_LIST_HEAD(&mdd->mdd_txn_cb.dtc_linkage); diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index c49b727..e2ee0f2 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -427,9 +427,10 @@ int mdd_create_txn_param_build(const struct lu_env *env, struct mdd_device *mdd, int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj, struct md_attr *ma, enum mdd_txn_op, int changelog_cnt); -int mdd_setattr_txn_param_build(const struct lu_env *env, struct md_object *obj, - struct md_attr *ma, enum mdd_txn_op, - int changelog_cnt); +void mdd_setattr_txn_param_build(const struct lu_env *env, + struct md_object *obj, + struct md_attr *ma, enum mdd_txn_op, + int changelog_cnt); int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd, struct mdd_object *obj, struct lu_attr *la); @@ -452,8 +453,6 @@ int mdd_txn_start_cb(const struct lu_env *env, struct txn_param *param, int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn, void *cookie); -int mdd_txn_commit_cb(const struct lu_env *env, struct thandle *txn, - void *cookie); /* mdd_device.c */ struct lu_object *mdd_object_alloc(const struct lu_env *env, const struct lu_object_header *hdr, diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 0c5c772..ecfe82b 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -1502,12 +1502,17 @@ static int mdd_attr_set(const struct lu_env *env, struct md_object *obj, chlog_cnt += (lmm->lmm_stripe_count >= 0) ? lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count; } + mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma, MDD_TXN_ATTR_SET_OP, chlog_cnt); handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) GOTO(no_trans, rc = PTR_ERR(handle)); + /* permission changes may require sync operation */ + if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID)) + handle->th_sync |= mdd->mdd_sync_permission; + if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)) CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n", ma->ma_attr.la_mtime, ma->ma_attr.la_ctime); @@ -1665,15 +1670,14 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj, RETURN(rc); mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1); - /* security-replated changes may require sync */ - if (!strcmp(name, XATTR_NAME_ACL_ACCESS) && - mdd->mdd_sync_permission == 1) - txn_param_sync(&mdd_env_info(env)->mti_param); - handle = mdd_trans_start(env, mdd); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); + /* security-replated changes may require sync */ + if (!strcmp(name, XATTR_NAME_ACL_ACCESS)) + handle->th_sync |= mdd->mdd_sync_permission; + rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle); /* Only record user xattr changes */ diff --git a/lustre/mdd/mdd_trans.c b/lustre/mdd/mdd_trans.c index 001cac3..9519e38 100644 --- a/lustre/mdd/mdd_trans.c +++ b/lustre/mdd/mdd_trans.c @@ -100,12 +100,6 @@ int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn, return mds_lov_write_objids(obd); } -int mdd_txn_commit_cb(const struct lu_env *env, struct thandle *txn, - void *cookie) -{ - return 0; -} - void mdd_txn_param_build(const struct lu_env *env, struct mdd_device *mdd, enum mdd_txn_op op, int changelog_cnt) { @@ -182,24 +176,17 @@ out: RETURN(rc); } -int mdd_setattr_txn_param_build(const struct lu_env *env, struct md_object *obj, - struct md_attr *ma, enum mdd_txn_op op, - int changelog_cnt) +void mdd_setattr_txn_param_build(const struct lu_env *env, + struct md_object *obj, + struct md_attr *ma, enum mdd_txn_op op, + int changelog_cnt) { struct mdd_device *mdd = mdo2mdd(&md2mdd_obj(obj)->mod_obj); - ENTRY; mdd_txn_param_build(env, mdd, op, changelog_cnt); if (ma->ma_attr.la_valid & (LA_UID | LA_GID)) txn_param_credit_add(&mdd_env_info(env)->mti_param, dto_txn_credits[DTO_ATTR_SET_CHOWN]); - - /* permission changes may require sync operation */ - if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID) && - mdd->mdd_sync_permission == 1) - txn_param_sync(&mdd_env_info(env)->mti_param); - - RETURN(0); } static void mdd_txn_init_dto_credits(const struct lu_env *env, diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index e45abae..1398f1e 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -5669,15 +5669,6 @@ LU_KEY_INIT_FINI(mdt, struct mdt_thread_info); /* context key: mdt_thread_key */ LU_CONTEXT_KEY_DEFINE(mdt, LCT_MD_THREAD); -/* context key constructor/destructor: mdt_txn_key_init, mdt_txn_key_fini */ -LU_KEY_INIT_FINI(mdt_txn, struct mdt_txn_info); - -struct lu_context_key mdt_txn_key = { - .lct_tags = LCT_TX_HANDLE, - .lct_init = mdt_txn_key_init, - .lct_fini = mdt_txn_key_fini -}; - struct md_ucred *mdt_ucred(const struct mdt_thread_info *info) { return md_ucred(info->mti_env); @@ -5720,7 +5711,7 @@ int mdt_cos_is_enabled(struct mdt_device *mdt) } /* type constructor/destructor: mdt_type_init, mdt_type_fini */ -LU_TYPE_INIT_FINI(mdt, &mdt_thread_key, &mdt_txn_key); +LU_TYPE_INIT_FINI(mdt, &mdt_thread_key); static struct lu_device_type_operations mdt_device_type_ops = { .ldto_init = mdt_type_init, diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index e5e2556..1317643 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -412,32 +412,6 @@ enum mdt_txn_op { MDT_TXN_LAST_RCVD_WRITE_OP, }; -/* - * Info allocated per-transaction. - */ -#define MDT_MAX_COMMIT_CB 4 -struct mdt_txn_info { - __u64 txi_transno; - unsigned int txi_cb_count; - struct lut_commit_cb txi_cb[MDT_MAX_COMMIT_CB]; -}; - -extern struct lu_context_key mdt_txn_key; - -static inline void mdt_trans_add_cb(const struct thandle *th, - lut_cb_t cb_func, void *cb_data) -{ - struct mdt_txn_info *txi; - - txi = lu_context_key_get(&th->th_ctx, &mdt_txn_key); - LASSERT(txi->txi_cb_count < ARRAY_SIZE(txi->txi_cb)); - - /* add new callback */ - txi->txi_cb[txi->txi_cb_count].lut_cb_func = cb_func; - txi->txi_cb[txi->txi_cb_count].lut_cb_data = cb_data; - txi->txi_cb_count++; -} - static inline const struct md_device_operations * mdt_child_ops(struct mdt_device * m) { diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index 103fc7a..8ae3631 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -123,17 +123,19 @@ struct thandle* mdt_trans_start(const struct lu_env *env, mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); p = &mti->mti_txn_param; - /* export can require sync operations */ - if (mti->mti_exp != NULL) - p->tp_sync = mti->mti_exp->exp_need_sync; - - return mdt->mdt_bottom->dd_ops->dt_trans_start(env, mdt->mdt_bottom, p); + return dt_trans_start(env, mdt->mdt_bottom, p); } void mdt_trans_stop(const struct lu_env *env, struct mdt_device *mdt, struct thandle *th) { - mdt->mdt_bottom->dd_ops->dt_trans_stop(env, th); + struct mdt_thread_info *mti; + + mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + /* export can require sync operations */ + if (mti->mti_exp != NULL) + th->th_sync |= mti->mti_exp->exp_need_sync; + dt_trans_stop(env, mdt->mdt_bottom, th); } static inline int mdt_last_rcvd_header_read(const struct lu_env *env, @@ -585,10 +587,15 @@ int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt) * transaction so that many connecting clients will not bring * server down with lots of sync writes. */ - mdt_trans_add_cb(th, lut_cb_client, class_export_cb_get(mti->mti_exp)); - cfs_spin_lock(&mti->mti_exp->exp_lock); - mti->mti_exp->exp_need_sync = 1; - cfs_spin_unlock(&mti->mti_exp->exp_lock); + rc = lut_new_client_cb_add(th, mti->mti_exp); + if (rc) { + /* can't add callback, do sync now */ + th->th_sync = 1; + } else { + cfs_spin_lock(&mti->mti_exp->exp_lock); + mti->mti_exp->exp_need_sync = 1; + cfs_spin_unlock(&mti->mti_exp->exp_lock); + } rc = mdt_last_rcvd_write(env, mdt, ted->ted_lcd, &off, th); CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n", @@ -848,17 +855,13 @@ static int mdt_txn_stop_cb(const struct lu_env *env, struct thandle *txn, void *cookie) { struct mdt_device *mdt = cookie; - struct mdt_txn_info *txi; struct mdt_thread_info *mti; struct ptlrpc_request *req; - /* transno in two contexts - for commit_cb and for thread */ - txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key); mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); req = mdt_info_req(mti); if (mti->mti_mdt == NULL || req == NULL || mti->mti_no_need_trans) { - txi->txi_transno = 0; mti->mti_no_need_trans = 0; return 0; } @@ -899,35 +902,13 @@ static int mdt_txn_stop_cb(const struct lu_env *env, req->rq_transno = mti->mti_transno; lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno); - /* save transno for the commit callback */ - txi->txi_transno = mti->mti_transno; - - /* add separate commit callback for transaction handling because we need - * export as parameter */ - mdt_trans_add_cb(txn, lut_cb_last_committed, - class_export_cb_get(mti->mti_exp)); - + /* if can't add callback, do sync write */ + txn->th_sync = !!lut_last_commit_cb_add(txn, &mdt->mdt_lut, + mti->mti_exp, + mti->mti_transno); return mdt_last_rcvd_update(mti, txn); } -/* commit callback, need to update last_committed value */ -static int mdt_txn_commit_cb(const struct lu_env *env, - struct thandle *txn, void *cookie) -{ - struct mdt_device *mdt = cookie; - struct mdt_txn_info *txi; - int i; - - txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key); - - /* iterate through all additional callbacks */ - for (i = 0; i < txi->txi_cb_count; i++) { - txi->txi_cb[i].lut_cb_func(&mdt->mdt_lut, txi->txi_transno, - txi->txi_cb[i].lut_cb_data, 0); - } - return 0; -} - int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt, struct obd_device *obd, struct lustre_sb_info *lsi) @@ -943,7 +924,7 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt, /* prepare transactions callbacks */ mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb; mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb; - mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb; + mdt->mdt_txn_cb.dtc_txn_commit = NULL; mdt->mdt_txn_cb.dtc_cookie = mdt; mdt->mdt_txn_cb.dtc_tag = LCT_MD_THREAD; CFS_INIT_LIST_HEAD(&mdt->mdt_txn_cb.dtc_linkage); diff --git a/lustre/obdclass/dt_object.c b/lustre/obdclass/dt_object.c index 3164187..3d08d2f 100644 --- a/lustre/obdclass/dt_object.c +++ b/lustre/obdclass/dt_object.c @@ -126,22 +126,15 @@ int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn) } EXPORT_SYMBOL(dt_txn_hook_stop); -int dt_txn_hook_commit(const struct lu_env *env, struct thandle *txn) +void dt_txn_hook_commit(struct thandle *txn) { - struct dt_device *dev = txn->th_dev; struct dt_txn_callback *cb; - int result; - result = 0; - cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) { - if (cb->dtc_txn_commit == NULL || - !(cb->dtc_tag & env->le_ctx.lc_tags)) - continue; - result = cb->dtc_txn_commit(env, txn, cb->dtc_cookie); - if (result < 0) - break; + cfs_list_for_each_entry(cb, &txn->th_dev->dd_txn_callbacks, + dtc_linkage) { + if (cb->dtc_txn_commit) + cb->dtc_txn_commit(txn, cb->dtc_cookie); } - return result; } EXPORT_SYMBOL(dt_txn_hook_commit); diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index 9bdf41c..9e4f75b 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -136,6 +136,7 @@ struct osd_thandle { struct thandle ot_super; handle_t *ot_handle; struct journal_callback ot_jcb; + cfs_list_t ot_dcb_list; /* Link to the device, for debugging. */ struct lu_ref_link *ot_dev_link; @@ -629,24 +630,19 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error) { struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb); struct thandle *th = &oh->ot_super; - struct dt_device *dev = th->th_dev; - struct lu_device *lud = &dev->dd_lu_dev; + struct lu_device *lud = &th->th_dev->dd_lu_dev; + struct dt_txn_commit_cb *dcb, *tmp; - LASSERT(dev != NULL); LASSERT(oh->ot_handle == NULL); - if (error) { + if (error) CERROR("transaction @0x%p commit error: %d\n", th, error); - } else { - struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit; - /* - * This od_env_for_commit is only for commit usage. see - * "struct dt_device" - */ - lu_context_enter(&env->le_ctx); - dt_txn_hook_commit(env, th); - lu_context_exit(&env->le_ctx); - } + + dt_txn_hook_commit(th); + + /* call per-transaction callbacks if any */ + cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) + dcb->dcb_func(NULL, th, dcb, error); lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th); lu_device_put(lud); @@ -686,6 +682,7 @@ static struct thandle *osd_trans_start(const struct lu_env *env, * be used. */ oti->oti_dev = dev; + CFS_INIT_LIST_HEAD(&oh->ot_dcb_list); osd_th_alloced(oh); jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits); osd_th_started(oh); @@ -694,7 +691,7 @@ static struct thandle *osd_trans_start(const struct lu_env *env, th = &oh->ot_super; th->th_dev = d; th->th_result = 0; - jh->h_sync = p->tp_sync; + th->th_sync = 0; lu_device_get(&d->dd_lu_dev); oh->ot_dev_link = lu_ref_add (&d->dd_lu_dev.ld_reference, @@ -702,8 +699,6 @@ static struct thandle *osd_trans_start(const struct lu_env *env, /* add commit callback */ lu_context_init(&th->th_ctx, LCT_TX_HANDLE); lu_context_enter(&th->th_ctx); - osd_journal_callback_set(jh,osd_trans_commit_cb, - &oh->ot_jcb); LASSERT(oti->oti_txns == 0); LASSERT(oti->oti_r_locks == 0); LASSERT(oti->oti_w_locks == 0); @@ -737,6 +732,15 @@ static void osd_trans_stop(const struct lu_env *env, struct thandle *th) if (oh->ot_handle != NULL) { handle_t *hdl = oh->ot_handle; + hdl->h_sync = th->th_sync; + /* + * add commit callback + * notice we don't do this in osd_trans_start() + * as underlying transaction can change during truncate + */ + osd_journal_callback_set(hdl, osd_trans_commit_cb, + &oh->ot_jcb); + LASSERT(oti->oti_txns == 1); oti->oti_txns--; LASSERT(oti->oti_r_locks == 0); @@ -749,10 +753,22 @@ static void osd_trans_stop(const struct lu_env *env, struct thandle *th) result = ldiskfs_journal_stop(hdl)); if (result != 0) CERROR("Failure to stop transaction: %d\n", result); + } else { + OBD_FREE_PTR(oh); } EXIT; } +static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb) +{ + struct osd_thandle *oh = container_of0(th, struct osd_thandle, + ot_super); + + cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list); + + return 0; +} + /* * Concurrency: no concurrent access is possible that late in object * life-cycle. @@ -1137,6 +1153,7 @@ static const struct dt_device_operations osd_dt_ops = { .dt_statfs = osd_statfs, .dt_trans_start = osd_trans_start, .dt_trans_stop = osd_trans_stop, + .dt_trans_cb_add = osd_trans_cb_add, .dt_conf_get = osd_conf_get, .dt_sync = osd_sync, .dt_ro = osd_ro, @@ -3873,17 +3890,7 @@ static struct lu_context_key osd_key = { static int osd_device_init(const struct lu_env *env, struct lu_device *d, const char *name, struct lu_device *next) { - int rc; - struct lu_context *ctx; - - /* context for commit hooks */ - ctx = &osd_dev(d)->od_env_for_commit.le_ctx; - rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_REMEMBER|LCT_NOREF); - if (rc == 0) { - rc = osd_procfs_init(osd_dev(d), name); - ctx->lc_cookie = 0x3; - } - return rc; + return osd_procfs_init(osd_dev(d), name); } static int osd_shutdown(const struct lu_env *env, struct osd_device *o) @@ -3957,7 +3964,6 @@ static struct lu_device *osd_device_fini(const struct lu_env *env, osd_dev(d)->od_mount->lmi_mnt); osd_dev(d)->od_mount = NULL; - lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx); RETURN(NULL); } diff --git a/lustre/osd-ldiskfs/osd_internal.h b/lustre/osd-ldiskfs/osd_internal.h index 343287c..5b3eaa4 100644 --- a/lustre/osd-ldiskfs/osd_internal.h +++ b/lustre/osd-ldiskfs/osd_internal.h @@ -114,14 +114,6 @@ struct osd_device { */ struct dt_object *od_obj_area; - /* Environment for transaction commit callback. - * Currently, OSD is based on ext3/JBD. Transaction commit in ext3/JBD - * is serialized, that is there is no more than one transaction commit - * at a time (JBD journal_commit_transaction() is serialized). - * This means that it's enough to have _one_ lu_context. - */ - struct lu_env od_env_for_commit; - /* * Fid Capability */ diff --git a/lustre/ptlrpc/target.c b/lustre/ptlrpc/target.c index df732ad..0de17bf 100644 --- a/lustre/ptlrpc/target.c +++ b/lustre/ptlrpc/target.c @@ -41,6 +41,7 @@ #include #include +#include /** * Update client data in last_rcvd file. An obd API @@ -338,37 +339,115 @@ EXPORT_SYMBOL(lut_boot_epoch_update); /** * commit callback, need to update last_commited value */ -void lut_cb_last_committed(struct lu_target *lut, __u64 transno, - void *data, int err) +struct lut_last_committed_callback { + struct dt_txn_commit_cb llcc_cb; + struct lu_target *llcc_lut; + struct obd_export *llcc_exp; + __u64 llcc_transno; +}; + +void lut_cb_last_committed(struct lu_env *env, struct thandle *th, + struct dt_txn_commit_cb *cb, int err) { - struct obd_export *exp = data; - LASSERT(exp->exp_obd == lut->lut_obd); - cfs_spin_lock(&lut->lut_translock); - if (transno > lut->lut_obd->obd_last_committed) - lut->lut_obd->obd_last_committed = transno; - - LASSERT(exp); - if (transno > exp->exp_last_committed) { - exp->exp_last_committed = transno; - cfs_spin_unlock(&lut->lut_translock); - ptlrpc_commit_replies(exp); + struct lut_last_committed_callback *ccb; + + ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb); + + LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd); + + cfs_spin_lock(&ccb->llcc_lut->lut_translock); + if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed) + ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno; + + LASSERT(ccb->llcc_exp); + if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) { + ccb->llcc_exp->exp_last_committed = ccb->llcc_transno; + cfs_spin_unlock(&ccb->llcc_lut->lut_translock); + ptlrpc_commit_replies(ccb->llcc_exp); } else { - cfs_spin_unlock(&lut->lut_translock); + cfs_spin_unlock(&ccb->llcc_lut->lut_translock); } - class_export_cb_put(exp); - if (transno) + class_export_cb_put(ccb->llcc_exp); + if (ccb->llcc_transno) CDEBUG(D_HA, "%s: transno "LPD64" is committed\n", - lut->lut_obd->obd_name, transno); + ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno); + cfs_list_del(&ccb->llcc_cb.dcb_linkage); + OBD_FREE_PTR(ccb); +} + +int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut, + struct obd_export *exp, __u64 transno) +{ + struct lut_last_committed_callback *ccb; + int rc; + + OBD_ALLOC_PTR(ccb); + if (ccb == NULL) + return -ENOMEM; + + ccb->llcc_cb.dcb_func = lut_cb_last_committed; + CFS_INIT_LIST_HEAD(&ccb->llcc_cb.dcb_linkage); + ccb->llcc_lut = lut; + ccb->llcc_exp = class_export_cb_get(exp); + ccb->llcc_transno = transno; + + rc = dt_trans_cb_add(th, &ccb->llcc_cb); + if (rc) { + class_export_cb_put(exp); + OBD_FREE_PTR(ccb); + } + return rc; } -EXPORT_SYMBOL(lut_cb_last_committed); +EXPORT_SYMBOL(lut_last_commit_cb_add); + +struct lut_new_client_callback { + struct dt_txn_commit_cb lncc_cb; + struct obd_export *lncc_exp; +}; -void lut_cb_client(struct lu_target *lut, __u64 transno, - void *data, int err) +void lut_cb_new_client(struct lu_env *env, struct thandle *th, + struct dt_txn_commit_cb *cb, int err) { - LASSERT(lut->lut_obd); - target_client_add_cb(lut->lut_obd, transno, data, err); + struct lut_new_client_callback *ccb; + + ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb); + + LASSERT(ccb->lncc_exp->exp_obd); + + CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n", + ccb->lncc_exp->exp_obd->obd_name, + ccb->lncc_exp->exp_client_uuid.uuid); + + cfs_spin_lock(&ccb->lncc_exp->exp_lock); + ccb->lncc_exp->exp_need_sync = 0; + cfs_spin_unlock(&ccb->lncc_exp->exp_lock); + class_export_cb_put(ccb->lncc_exp); + + cfs_list_del(&ccb->lncc_cb.dcb_linkage); + OBD_FREE_PTR(ccb); +} + +int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp) +{ + struct lut_new_client_callback *ccb; + int rc; + + OBD_ALLOC_PTR(ccb); + if (ccb == NULL) + return -ENOMEM; + + ccb->lncc_cb.dcb_func = lut_cb_new_client; + CFS_INIT_LIST_HEAD(&ccb->lncc_cb.dcb_linkage); + ccb->lncc_exp = class_export_cb_get(exp); + + rc = dt_trans_cb_add(th, &ccb->lncc_cb); + if (rc) { + class_export_cb_put(exp); + OBD_FREE_PTR(ccb); + } + return rc; } -EXPORT_SYMBOL(lut_cb_client); +EXPORT_SYMBOL(lut_new_client_cb_add); int lut_init(const struct lu_env *env, struct lu_target *lut, struct obd_device *obd, struct dt_device *dt) -- 1.8.3.1