range_alloc(&seq->lss_hiwater_set, space, seq->lss_set_width);
rc = seq_store_update(env, seq, NULL, 1);
- seq->lss_set_transno = 0;
return rc;
}
if (range_is_exhausted(loset)) {
/* reached high water mark. */
struct lu_device *dev = seq->lss_site->ms_lu.ls_top_dev;
- struct lu_target *tg = dev->ld_obd->u.obt.obt_lut;
int obd_num_clients = dev->ld_obd->obd_num_exports;
__u64 set_sz;
- int sync = 0;
/* calculate new seq width based on number of clients */
set_sz = max(seq->lss_set_width,
- obd_num_clients * seq->lss_width);
+ obd_num_clients * seq->lss_width);
set_sz = min(range_space(space), set_sz);
/* Switch to hiwater range now */
- loset = hiset;
+ *loset = *hiset;
/* allocate new hiwater range */
range_alloc(hiset, space, set_sz);
- if (seq->lss_set_transno > dev->ld_obd->obd_last_committed)
- sync = 1;
-
/* update ondisk seq with new *space */
- rc = seq_store_update(env, seq, NULL, sync);
-
- /* set new hiwater transno */
- cfs_spin_lock(&tg->lut_translock);
- seq->lss_set_transno = tg->lut_last_transno;
- cfs_spin_unlock(&tg->lut_translock);
+ rc = seq_store_update(env, seq, NULL, seq->lss_need_sync);
}
LASSERTF(!range_is_exhausted(loset) || range_is_sane(loset),
return buf;
}
-struct thandle *seq_store_trans_start(struct lu_server_seq *seq,
- const struct lu_env *env, int credit,
- int sync)
-{
- struct seq_thread_info *info;
- struct dt_device *dt_dev;
- struct thandle *th;
- ENTRY;
+struct seq_update_callback {
+ struct dt_txn_commit_cb suc_cb;
+ struct lu_server_seq *suc_seq;
+};
- dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
- info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
- LASSERT(info != NULL);
-
- txn_param_init(&info->sti_txn, credit);
- if (sync)
- txn_param_sync(&info->sti_txn);
-
- th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &info->sti_txn);
- return th;
+void seq_update_cb(struct lu_env *env, struct thandle *th,
+ struct dt_txn_commit_cb *cb, int err)
+{
+ struct seq_update_callback *ccb;
+ ccb = container_of0(cb, struct seq_update_callback, suc_cb);
+ ccb->suc_seq->lss_need_sync = 0;
+ cfs_list_del(&ccb->suc_cb.dcb_linkage);
+ OBD_FREE_PTR(ccb);
}
-void seq_store_trans_stop(struct lu_server_seq *seq,
- const struct lu_env *env,
- struct thandle *th)
+int seq_update_cb_add(struct thandle *th, struct lu_server_seq *seq)
{
- struct dt_device *dt_dev;
- ENTRY;
-
- dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
-
- dt_dev->dd_ops->dt_trans_stop(env, th);
+ struct seq_update_callback *ccb;
+ int rc;
+ OBD_ALLOC_PTR(ccb);
+ if (ccb == NULL)
+ return -ENOMEM;
+
+ ccb->suc_cb.dcb_func = seq_update_cb;
+ CFS_INIT_LIST_HEAD(&ccb->suc_cb.dcb_linkage);
+ ccb->suc_seq = seq;
+ seq->lss_need_sync = 1;
+ rc = dt_trans_cb_add(th, &ccb->suc_cb);
+ if (rc)
+ OBD_FREE_PTR(ccb);
+ return rc;
}
/* This function implies that caller takes care about locking. */
int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
struct lu_seq_range *out, int sync)
{
+ struct seq_thread_info *info;
+ struct dt_device *dt_dev;
struct thandle *th;
int rc;
int credits = SEQ_TXN_STORE_CREDITS;
+ ENTRY;
+
+ dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
+ info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
if (out != NULL)
credits += FLD_TXN_INDEX_INSERT_CREDITS;
- th = seq_store_trans_start(seq, env, credits, sync);
+ txn_param_init(&info->sti_txn, credits);
+ th = dt_trans_start(env, dt_dev, &info->sti_txn);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
if (rc) {
CERROR("%s: Can't write space data, rc %d\n",
seq->lss_name, rc);
+ GOTO(out,rc);
} else if (out != NULL) {
rc = fld_server_create(seq->lss_site->ms_server_fld,
env, out, th);
if (rc) {
CERROR("%s: Can't Update fld database, rc %d\n",
seq->lss_name, rc);
+ GOTO(out,rc);
}
}
- seq_store_trans_stop(seq, env, th);
+ /* next sequence update will need sync until this update is committed
+ * in case of sync operation this is not needed obviously */
+ if (!sync)
+ /* if callback can't be added then sync always */
+ sync = !!seq_update_cb_add(th, seq);
+
+ th->th_sync |= sync;
+out:
+ dt_trans_stop(env, dt_dev, th);
return rc;
}
};
/**
+ * Per-transaction commit callback function
+ */
+struct dt_txn_commit_cb;
+typedef void (*dt_cb_t)(struct lu_env *env, struct thandle *th,
+ struct dt_txn_commit_cb *cb, int err);
+/**
+ * Special per-transaction callback for cases when just commit callback
+ * is needed and per-device callback are not convenient to use
+ */
+struct dt_txn_commit_cb {
+ cfs_list_t dcb_linkage;
+ dt_cb_t dcb_func;
+};
+
+/**
* Basic transaction credit op
*/
enum dt_txn_op {
void (*dt_trans_stop)(const struct lu_env *env,
struct thandle *th);
/**
+ * Add commit callback to the transaction.
+ */
+ int (*dt_trans_cb_add)(struct thandle *th,
+ struct dt_txn_commit_cb *dcb);
+ /**
* Return fid of root index object.
*/
int (*dt_root_get)(const struct lu_env *env,
struct txn_param {
/** number of blocks this transaction will modify */
unsigned int tp_credits;
- /** sync transaction is needed */
- __u32 tp_sync:1;
};
static inline void txn_param_init(struct txn_param *p, unsigned int credits)
p->tp_credits += credits;
}
-static inline void txn_param_sync(struct txn_param *p)
-{
- p->tp_sync = 1;
-}
-
/**
* This is the general purpose transaction handle.
* 1. Transaction Life Cycle
/** the last operation result in this transaction.
* this value is used in recovery */
__s32 th_result;
+ /** whether we need sync commit */
+ int th_sync;
};
/**
struct txn_param *param, void *cookie);
int (*dtc_txn_stop)(const struct lu_env *env,
struct thandle *txn, void *cookie);
- int (*dtc_txn_commit)(const struct lu_env *env,
- struct thandle *txn, void *cookie);
+ void (*dtc_txn_commit)(struct thandle *txn, void *cookie);
void *dtc_cookie;
__u32 dtc_tag;
cfs_list_t dtc_linkage;
int dt_txn_hook_start(const struct lu_env *env,
struct dt_device *dev, struct txn_param *param);
int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn);
-int dt_txn_hook_commit(const struct lu_env *env, struct thandle *txn);
+void dt_txn_hook_commit(struct thandle *txn);
int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj);
}
static inline void dt_trans_stop(const struct lu_env *env,
- struct dt_device *d,
- struct thandle *th)
+ struct dt_device *d, struct thandle *th)
{
LASSERT(d->dd_ops->dt_trans_stop);
return d->dd_ops->dt_trans_stop(env, th);
}
+
+static inline int dt_trans_cb_add(struct thandle *th,
+ struct dt_txn_commit_cb *dcb)
+{
+ LASSERT(th->th_dev->dd_ops->dt_trans_cb_add);
+ return th->th_dev->dd_ops->dt_trans_cb_add(th, dcb);
+}
/** @} dt */
#endif /* __LUSTRE_DT_OBJECT_H */
};
void lut_boot_epoch_update(struct lu_target *);
-void lut_cb_last_committed(struct lu_target *, __u64, void *, int);
-void lut_cb_client(struct lu_target *, __u64, void *, int);
+int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
+ struct obd_export *exp, __u64 transno);
+int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp);
int lut_init(const struct lu_env *, struct lu_target *,
struct obd_device *, struct dt_device *);
void lut_fini(const struct lu_env *, struct lu_target *);
*/
__u64 lss_set_width;
- /* transaction no of seq update write operation */
- __u64 lss_set_transno;
+ /* sync is needed for update operation */
+ __u32 lss_need_sync;
/**
* Pointer to site object, required to access site fld.
*/
/* Prepare transactions callbacks. */
mdd->mdd_txn_cb.dtc_txn_start = mdd_txn_start_cb;
mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
- mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb;
+ mdd->mdd_txn_cb.dtc_txn_commit = NULL;
mdd->mdd_txn_cb.dtc_cookie = mdd;
mdd->mdd_txn_cb.dtc_tag = LCT_MD_THREAD;
CFS_INIT_LIST_HEAD(&mdd->mdd_txn_cb.dtc_linkage);
int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj,
struct md_attr *ma, enum mdd_txn_op,
int changelog_cnt);
-int mdd_setattr_txn_param_build(const struct lu_env *env, struct md_object *obj,
- struct md_attr *ma, enum mdd_txn_op,
- int changelog_cnt);
+void mdd_setattr_txn_param_build(const struct lu_env *env,
+ struct md_object *obj,
+ struct md_attr *ma, enum mdd_txn_op,
+ int changelog_cnt);
int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
struct mdd_object *obj, struct lu_attr *la);
int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
void *cookie);
-int mdd_txn_commit_cb(const struct lu_env *env, struct thandle *txn,
- void *cookie);
/* mdd_device.c */
struct lu_object *mdd_object_alloc(const struct lu_env *env,
const struct lu_object_header *hdr,
chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
}
+
mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
MDD_TXN_ATTR_SET_OP, chlog_cnt);
handle = mdd_trans_start(env, mdd);
if (IS_ERR(handle))
GOTO(no_trans, rc = PTR_ERR(handle));
+ /* permission changes may require sync operation */
+ if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
+ handle->th_sync |= mdd->mdd_sync_permission;
+
if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
RETURN(rc);
mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
- /* security-replated changes may require sync */
- if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
- mdd->mdd_sync_permission == 1)
- txn_param_sync(&mdd_env_info(env)->mti_param);
-
handle = mdd_trans_start(env, mdd);
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
+ /* security-replated changes may require sync */
+ if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
+ handle->th_sync |= mdd->mdd_sync_permission;
+
rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
/* Only record user xattr changes */
return mds_lov_write_objids(obd);
}
-int mdd_txn_commit_cb(const struct lu_env *env, struct thandle *txn,
- void *cookie)
-{
- return 0;
-}
-
void mdd_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
enum mdd_txn_op op, int changelog_cnt)
{
RETURN(rc);
}
-int mdd_setattr_txn_param_build(const struct lu_env *env, struct md_object *obj,
- struct md_attr *ma, enum mdd_txn_op op,
- int changelog_cnt)
+void mdd_setattr_txn_param_build(const struct lu_env *env,
+ struct md_object *obj,
+ struct md_attr *ma, enum mdd_txn_op op,
+ int changelog_cnt)
{
struct mdd_device *mdd = mdo2mdd(&md2mdd_obj(obj)->mod_obj);
- ENTRY;
mdd_txn_param_build(env, mdd, op, changelog_cnt);
if (ma->ma_attr.la_valid & (LA_UID | LA_GID))
txn_param_credit_add(&mdd_env_info(env)->mti_param,
dto_txn_credits[DTO_ATTR_SET_CHOWN]);
-
- /* permission changes may require sync operation */
- if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID) &&
- mdd->mdd_sync_permission == 1)
- txn_param_sync(&mdd_env_info(env)->mti_param);
-
- RETURN(0);
}
static void mdd_txn_init_dto_credits(const struct lu_env *env,
/* context key: mdt_thread_key */
LU_CONTEXT_KEY_DEFINE(mdt, LCT_MD_THREAD);
-/* context key constructor/destructor: mdt_txn_key_init, mdt_txn_key_fini */
-LU_KEY_INIT_FINI(mdt_txn, struct mdt_txn_info);
-
-struct lu_context_key mdt_txn_key = {
- .lct_tags = LCT_TX_HANDLE,
- .lct_init = mdt_txn_key_init,
- .lct_fini = mdt_txn_key_fini
-};
-
struct md_ucred *mdt_ucred(const struct mdt_thread_info *info)
{
return md_ucred(info->mti_env);
}
/* type constructor/destructor: mdt_type_init, mdt_type_fini */
-LU_TYPE_INIT_FINI(mdt, &mdt_thread_key, &mdt_txn_key);
+LU_TYPE_INIT_FINI(mdt, &mdt_thread_key);
static struct lu_device_type_operations mdt_device_type_ops = {
.ldto_init = mdt_type_init,
MDT_TXN_LAST_RCVD_WRITE_OP,
};
-/*
- * Info allocated per-transaction.
- */
-#define MDT_MAX_COMMIT_CB 4
-struct mdt_txn_info {
- __u64 txi_transno;
- unsigned int txi_cb_count;
- struct lut_commit_cb txi_cb[MDT_MAX_COMMIT_CB];
-};
-
-extern struct lu_context_key mdt_txn_key;
-
-static inline void mdt_trans_add_cb(const struct thandle *th,
- lut_cb_t cb_func, void *cb_data)
-{
- struct mdt_txn_info *txi;
-
- txi = lu_context_key_get(&th->th_ctx, &mdt_txn_key);
- LASSERT(txi->txi_cb_count < ARRAY_SIZE(txi->txi_cb));
-
- /* add new callback */
- txi->txi_cb[txi->txi_cb_count].lut_cb_func = cb_func;
- txi->txi_cb[txi->txi_cb_count].lut_cb_data = cb_data;
- txi->txi_cb_count++;
-}
-
static inline const struct md_device_operations *
mdt_child_ops(struct mdt_device * m)
{
mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
p = &mti->mti_txn_param;
- /* export can require sync operations */
- if (mti->mti_exp != NULL)
- p->tp_sync = mti->mti_exp->exp_need_sync;
-
- return mdt->mdt_bottom->dd_ops->dt_trans_start(env, mdt->mdt_bottom, p);
+ return dt_trans_start(env, mdt->mdt_bottom, p);
}
void mdt_trans_stop(const struct lu_env *env,
struct mdt_device *mdt, struct thandle *th)
{
- mdt->mdt_bottom->dd_ops->dt_trans_stop(env, th);
+ struct mdt_thread_info *mti;
+
+ mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ /* export can require sync operations */
+ if (mti->mti_exp != NULL)
+ th->th_sync |= mti->mti_exp->exp_need_sync;
+ dt_trans_stop(env, mdt->mdt_bottom, th);
}
static inline int mdt_last_rcvd_header_read(const struct lu_env *env,
* transaction so that many connecting clients will not bring
* server down with lots of sync writes.
*/
- mdt_trans_add_cb(th, lut_cb_client, class_export_cb_get(mti->mti_exp));
- cfs_spin_lock(&mti->mti_exp->exp_lock);
- mti->mti_exp->exp_need_sync = 1;
- cfs_spin_unlock(&mti->mti_exp->exp_lock);
+ rc = lut_new_client_cb_add(th, mti->mti_exp);
+ if (rc) {
+ /* can't add callback, do sync now */
+ th->th_sync = 1;
+ } else {
+ cfs_spin_lock(&mti->mti_exp->exp_lock);
+ mti->mti_exp->exp_need_sync = 1;
+ cfs_spin_unlock(&mti->mti_exp->exp_lock);
+ }
rc = mdt_last_rcvd_write(env, mdt, ted->ted_lcd, &off, th);
CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n",
struct thandle *txn, void *cookie)
{
struct mdt_device *mdt = cookie;
- struct mdt_txn_info *txi;
struct mdt_thread_info *mti;
struct ptlrpc_request *req;
- /* transno in two contexts - for commit_cb and for thread */
- txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
req = mdt_info_req(mti);
if (mti->mti_mdt == NULL || req == NULL || mti->mti_no_need_trans) {
- txi->txi_transno = 0;
mti->mti_no_need_trans = 0;
return 0;
}
req->rq_transno = mti->mti_transno;
lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno);
- /* save transno for the commit callback */
- txi->txi_transno = mti->mti_transno;
-
- /* add separate commit callback for transaction handling because we need
- * export as parameter */
- mdt_trans_add_cb(txn, lut_cb_last_committed,
- class_export_cb_get(mti->mti_exp));
-
+ /* if can't add callback, do sync write */
+ txn->th_sync = !!lut_last_commit_cb_add(txn, &mdt->mdt_lut,
+ mti->mti_exp,
+ mti->mti_transno);
return mdt_last_rcvd_update(mti, txn);
}
-/* commit callback, need to update last_committed value */
-static int mdt_txn_commit_cb(const struct lu_env *env,
- struct thandle *txn, void *cookie)
-{
- struct mdt_device *mdt = cookie;
- struct mdt_txn_info *txi;
- int i;
-
- txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
-
- /* iterate through all additional callbacks */
- for (i = 0; i < txi->txi_cb_count; i++) {
- txi->txi_cb[i].lut_cb_func(&mdt->mdt_lut, txi->txi_transno,
- txi->txi_cb[i].lut_cb_data, 0);
- }
- return 0;
-}
-
int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
struct obd_device *obd,
struct lustre_sb_info *lsi)
/* prepare transactions callbacks */
mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
- mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
+ mdt->mdt_txn_cb.dtc_txn_commit = NULL;
mdt->mdt_txn_cb.dtc_cookie = mdt;
mdt->mdt_txn_cb.dtc_tag = LCT_MD_THREAD;
CFS_INIT_LIST_HEAD(&mdt->mdt_txn_cb.dtc_linkage);
}
EXPORT_SYMBOL(dt_txn_hook_stop);
-int dt_txn_hook_commit(const struct lu_env *env, struct thandle *txn)
+void dt_txn_hook_commit(struct thandle *txn)
{
- struct dt_device *dev = txn->th_dev;
struct dt_txn_callback *cb;
- int result;
- result = 0;
- cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
- if (cb->dtc_txn_commit == NULL ||
- !(cb->dtc_tag & env->le_ctx.lc_tags))
- continue;
- result = cb->dtc_txn_commit(env, txn, cb->dtc_cookie);
- if (result < 0)
- break;
+ cfs_list_for_each_entry(cb, &txn->th_dev->dd_txn_callbacks,
+ dtc_linkage) {
+ if (cb->dtc_txn_commit)
+ cb->dtc_txn_commit(txn, cb->dtc_cookie);
}
- return result;
}
EXPORT_SYMBOL(dt_txn_hook_commit);
struct thandle ot_super;
handle_t *ot_handle;
struct journal_callback ot_jcb;
+ cfs_list_t ot_dcb_list;
/* Link to the device, for debugging. */
struct lu_ref_link *ot_dev_link;
{
struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
struct thandle *th = &oh->ot_super;
- struct dt_device *dev = th->th_dev;
- struct lu_device *lud = &dev->dd_lu_dev;
+ struct lu_device *lud = &th->th_dev->dd_lu_dev;
+ struct dt_txn_commit_cb *dcb, *tmp;
- LASSERT(dev != NULL);
LASSERT(oh->ot_handle == NULL);
- if (error) {
+ if (error)
CERROR("transaction @0x%p commit error: %d\n", th, error);
- } else {
- struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
- /*
- * This od_env_for_commit is only for commit usage. see
- * "struct dt_device"
- */
- lu_context_enter(&env->le_ctx);
- dt_txn_hook_commit(env, th);
- lu_context_exit(&env->le_ctx);
- }
+
+ dt_txn_hook_commit(th);
+
+ /* call per-transaction callbacks if any */
+ cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
+ dcb->dcb_func(NULL, th, dcb, error);
lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
lu_device_put(lud);
* be used.
*/
oti->oti_dev = dev;
+ CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
osd_th_alloced(oh);
jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
osd_th_started(oh);
th = &oh->ot_super;
th->th_dev = d;
th->th_result = 0;
- jh->h_sync = p->tp_sync;
+ th->th_sync = 0;
lu_device_get(&d->dd_lu_dev);
oh->ot_dev_link = lu_ref_add
(&d->dd_lu_dev.ld_reference,
/* add commit callback */
lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
lu_context_enter(&th->th_ctx);
- osd_journal_callback_set(jh,osd_trans_commit_cb,
- &oh->ot_jcb);
LASSERT(oti->oti_txns == 0);
LASSERT(oti->oti_r_locks == 0);
LASSERT(oti->oti_w_locks == 0);
if (oh->ot_handle != NULL) {
handle_t *hdl = oh->ot_handle;
+ hdl->h_sync = th->th_sync;
+ /*
+ * add commit callback
+ * notice we don't do this in osd_trans_start()
+ * as underlying transaction can change during truncate
+ */
+ osd_journal_callback_set(hdl, osd_trans_commit_cb,
+ &oh->ot_jcb);
+
LASSERT(oti->oti_txns == 1);
oti->oti_txns--;
LASSERT(oti->oti_r_locks == 0);
result = ldiskfs_journal_stop(hdl));
if (result != 0)
CERROR("Failure to stop transaction: %d\n", result);
+ } else {
+ OBD_FREE_PTR(oh);
}
EXIT;
}
+static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
+{
+ struct osd_thandle *oh = container_of0(th, struct osd_thandle,
+ ot_super);
+
+ cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
+
+ return 0;
+}
+
/*
* Concurrency: no concurrent access is possible that late in object
* life-cycle.
.dt_statfs = osd_statfs,
.dt_trans_start = osd_trans_start,
.dt_trans_stop = osd_trans_stop,
+ .dt_trans_cb_add = osd_trans_cb_add,
.dt_conf_get = osd_conf_get,
.dt_sync = osd_sync,
.dt_ro = osd_ro,
static int osd_device_init(const struct lu_env *env, struct lu_device *d,
const char *name, struct lu_device *next)
{
- int rc;
- struct lu_context *ctx;
-
- /* context for commit hooks */
- ctx = &osd_dev(d)->od_env_for_commit.le_ctx;
- rc = lu_context_init(ctx, LCT_MD_THREAD|LCT_REMEMBER|LCT_NOREF);
- if (rc == 0) {
- rc = osd_procfs_init(osd_dev(d), name);
- ctx->lc_cookie = 0x3;
- }
- return rc;
+ return osd_procfs_init(osd_dev(d), name);
}
static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
osd_dev(d)->od_mount->lmi_mnt);
osd_dev(d)->od_mount = NULL;
- lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
RETURN(NULL);
}
*/
struct dt_object *od_obj_area;
- /* Environment for transaction commit callback.
- * Currently, OSD is based on ext3/JBD. Transaction commit in ext3/JBD
- * is serialized, that is there is no more than one transaction commit
- * at a time (JBD journal_commit_transaction() is serialized).
- * This means that it's enough to have _one_ lu_context.
- */
- struct lu_env od_env_for_commit;
-
/*
* Fid Capability
*/
#include <obd.h>
#include <lustre_fsfilt.h>
+#include <obd_class.h>
/**
* Update client data in last_rcvd file. An obd API
/**
* commit callback, need to update last_commited value
*/
-void lut_cb_last_committed(struct lu_target *lut, __u64 transno,
- void *data, int err)
+struct lut_last_committed_callback {
+ struct dt_txn_commit_cb llcc_cb;
+ struct lu_target *llcc_lut;
+ struct obd_export *llcc_exp;
+ __u64 llcc_transno;
+};
+
+void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
+ struct dt_txn_commit_cb *cb, int err)
{
- struct obd_export *exp = data;
- LASSERT(exp->exp_obd == lut->lut_obd);
- cfs_spin_lock(&lut->lut_translock);
- if (transno > lut->lut_obd->obd_last_committed)
- lut->lut_obd->obd_last_committed = transno;
-
- LASSERT(exp);
- if (transno > exp->exp_last_committed) {
- exp->exp_last_committed = transno;
- cfs_spin_unlock(&lut->lut_translock);
- ptlrpc_commit_replies(exp);
+ struct lut_last_committed_callback *ccb;
+
+ ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
+
+ LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
+
+ cfs_spin_lock(&ccb->llcc_lut->lut_translock);
+ if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
+ ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
+
+ LASSERT(ccb->llcc_exp);
+ if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
+ ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
+ cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
+ ptlrpc_commit_replies(ccb->llcc_exp);
} else {
- cfs_spin_unlock(&lut->lut_translock);
+ cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
}
- class_export_cb_put(exp);
- if (transno)
+ class_export_cb_put(ccb->llcc_exp);
+ if (ccb->llcc_transno)
CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
- lut->lut_obd->obd_name, transno);
+ ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
+ cfs_list_del(&ccb->llcc_cb.dcb_linkage);
+ OBD_FREE_PTR(ccb);
+}
+
+int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
+ struct obd_export *exp, __u64 transno)
+{
+ struct lut_last_committed_callback *ccb;
+ int rc;
+
+ OBD_ALLOC_PTR(ccb);
+ if (ccb == NULL)
+ return -ENOMEM;
+
+ ccb->llcc_cb.dcb_func = lut_cb_last_committed;
+ CFS_INIT_LIST_HEAD(&ccb->llcc_cb.dcb_linkage);
+ ccb->llcc_lut = lut;
+ ccb->llcc_exp = class_export_cb_get(exp);
+ ccb->llcc_transno = transno;
+
+ rc = dt_trans_cb_add(th, &ccb->llcc_cb);
+ if (rc) {
+ class_export_cb_put(exp);
+ OBD_FREE_PTR(ccb);
+ }
+ return rc;
}
-EXPORT_SYMBOL(lut_cb_last_committed);
+EXPORT_SYMBOL(lut_last_commit_cb_add);
+
+struct lut_new_client_callback {
+ struct dt_txn_commit_cb lncc_cb;
+ struct obd_export *lncc_exp;
+};
-void lut_cb_client(struct lu_target *lut, __u64 transno,
- void *data, int err)
+void lut_cb_new_client(struct lu_env *env, struct thandle *th,
+ struct dt_txn_commit_cb *cb, int err)
{
- LASSERT(lut->lut_obd);
- target_client_add_cb(lut->lut_obd, transno, data, err);
+ struct lut_new_client_callback *ccb;
+
+ ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
+
+ LASSERT(ccb->lncc_exp->exp_obd);
+
+ CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
+ ccb->lncc_exp->exp_obd->obd_name,
+ ccb->lncc_exp->exp_client_uuid.uuid);
+
+ cfs_spin_lock(&ccb->lncc_exp->exp_lock);
+ ccb->lncc_exp->exp_need_sync = 0;
+ cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
+ class_export_cb_put(ccb->lncc_exp);
+
+ cfs_list_del(&ccb->lncc_cb.dcb_linkage);
+ OBD_FREE_PTR(ccb);
+}
+
+int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
+{
+ struct lut_new_client_callback *ccb;
+ int rc;
+
+ OBD_ALLOC_PTR(ccb);
+ if (ccb == NULL)
+ return -ENOMEM;
+
+ ccb->lncc_cb.dcb_func = lut_cb_new_client;
+ CFS_INIT_LIST_HEAD(&ccb->lncc_cb.dcb_linkage);
+ ccb->lncc_exp = class_export_cb_get(exp);
+
+ rc = dt_trans_cb_add(th, &ccb->lncc_cb);
+ if (rc) {
+ class_export_cb_put(exp);
+ OBD_FREE_PTR(ccb);
+ }
+ return rc;
}
-EXPORT_SYMBOL(lut_cb_client);
+EXPORT_SYMBOL(lut_new_client_cb_add);
int lut_init(const struct lu_env *env, struct lu_target *lut,
struct obd_device *obd, struct dt_device *dt)