From 2fe22edfe3c365b5c270050fdeed0a86fa74a919 Mon Sep 17 00:00:00 2001 From: Wang Di Date: Sat, 22 Nov 2014 20:06:55 -0800 Subject: [PATCH] LU-3534 osp: send updates by separate thread Sending updates to other MDT by separate thread, so 1. The thread will sends the update by version number, i.e. lower version of update will be sent first. 2. If one operation includes updates on several MDTs, these updates can be sent on different MDTs in parallel. Though in transaction stop (top_trans_stop()), the top transaction needs to wait all sub transaction stop their transaction (i.e. updates have been executed on all MDTs), then return. 3. Move dt_update_request to osp layer, since it is only OSP layer thing, and each OSP transaction will create the osp_update_request and attached to the list of OSP sending thread. 4. Change lgh_hdr_lock from spinlock to rw_semaphore, because if the llog object is remote object, it will go into osp_md_write_obj, where it might be blocked by memory allocation. 5. a few cleanups for the pervious DNE patches http://review.whamcloud.com/11572 http://review.whamcloud.com/11737 see LU-6691, LU-6692, LU-6693. Change-Id: I2dde892e4ca4bf74c2cd56e16bb1125920365334 Signed-off-by: Wang Di Reviewed-on: http://review.whamcloud.com/12825 Tested-by: Jenkins Reviewed-by: James Simmons Tested-by: Maloo Reviewed-by: Alex Zhuravlev Reviewed-by: Lai Siyao Reviewed-by: Oleg Drokin --- lustre/include/dt_object.h | 2 +- lustre/include/lu_object.h | 38 --- lustre/include/lu_target.h | 2 +- lustre/include/lustre_log.h | 2 +- lustre/include/lustre_update.h | 31 +- lustre/ldlm/ldlm_lib.c | 7 +- lustre/lod/lod_dev.c | 4 +- lustre/obdclass/llog.c | 14 +- lustre/obdclass/llog_osd.c | 34 +- lustre/osd-ldiskfs/osd_handler.c | 29 +- lustre/osd-ldiskfs/osd_internal.h | 3 +- lustre/osp/osp_dev.c | 99 +++++- lustre/osp/osp_internal.h | 108 +++++-- lustre/osp/osp_md_object.c | 87 ++--- lustre/osp/osp_object.c | 28 +- lustre/osp/osp_trans.c | 654 ++++++++++++++++++++++++++------------ lustre/target/out_handler.c | 6 +- lustre/target/update_recovery.c | 12 +- lustre/target/update_trans.c | 114 ++++++- 19 files changed, 895 insertions(+), 379 deletions(-) diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 4b98848..dd3f40b 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -104,7 +104,7 @@ typedef void (*dt_cb_t)(struct lu_env *env, struct thandle *th, #define TRANS_COMMIT_CB_MAGIC 0xa0a00a0a #define MAX_COMMIT_CB_STR_LEN 32 -#define DCB_TRANS_NOT_COMMITTED 0x1 +#define DCB_TRANS_STOP 0x1 struct dt_txn_commit_cb { struct list_head dcb_linkage; dt_cb_t dcb_func; diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index 39d9d22..c11a2a7 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -439,44 +439,6 @@ struct lu_attr { __u64 la_valid; }; -static inline void lu_attr_cpu_to_le(struct lu_attr *dst_attr, - struct lu_attr *src_attr) -{ - dst_attr->la_size = cpu_to_le64(src_attr->la_size); - dst_attr->la_mtime = cpu_to_le64(src_attr->la_mtime); - dst_attr->la_atime = cpu_to_le64(src_attr->la_atime); - dst_attr->la_ctime = cpu_to_le64(src_attr->la_ctime); - dst_attr->la_blocks = cpu_to_le64(src_attr->la_blocks); - dst_attr->la_mode = cpu_to_le32(src_attr->la_mode); - dst_attr->la_uid = cpu_to_le32(src_attr->la_uid); - dst_attr->la_gid = cpu_to_le32(src_attr->la_gid); - dst_attr->la_flags = cpu_to_le32(src_attr->la_flags); - dst_attr->la_nlink = cpu_to_le32(src_attr->la_nlink); - dst_attr->la_blkbits = cpu_to_le32(src_attr->la_blkbits); - dst_attr->la_blksize = cpu_to_le32(src_attr->la_blksize); - dst_attr->la_rdev = cpu_to_le32(src_attr->la_rdev); - dst_attr->la_valid = cpu_to_le64(src_attr->la_valid); -} - -static inline void lu_attr_le_to_cpu(struct lu_attr *dst_attr, - struct lu_attr *src_attr) -{ - dst_attr->la_size = le64_to_cpu(src_attr->la_size); - dst_attr->la_mtime = le64_to_cpu(src_attr->la_mtime); - dst_attr->la_atime = le64_to_cpu(src_attr->la_atime); - dst_attr->la_ctime = le64_to_cpu(src_attr->la_ctime); - dst_attr->la_blocks = le64_to_cpu(src_attr->la_blocks); - dst_attr->la_mode = le32_to_cpu(src_attr->la_mode); - dst_attr->la_uid = le32_to_cpu(src_attr->la_uid); - dst_attr->la_gid = le32_to_cpu(src_attr->la_gid); - dst_attr->la_flags = le32_to_cpu(src_attr->la_flags); - dst_attr->la_nlink = le32_to_cpu(src_attr->la_nlink); - dst_attr->la_blkbits = le32_to_cpu(src_attr->la_blkbits); - dst_attr->la_blksize = le32_to_cpu(src_attr->la_blksize); - dst_attr->la_rdev = le32_to_cpu(src_attr->la_rdev); - dst_attr->la_valid = le64_to_cpu(src_attr->la_valid); -} - /** Bit-mask of valid attributes */ enum la_valid { LA_ATIME = 1 << 0, diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 6c4c384..4b5b7d7 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -417,7 +417,7 @@ int distribute_txn_replay_handle(struct lu_env *env, __u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd); struct distribute_txn_replay_req * distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd); -void dtrq_destory(struct distribute_txn_replay_req *dtrq); +void dtrq_destroy(struct distribute_txn_replay_req *dtrq); struct distribute_txn_replay_req_sub * dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index); diff --git a/lustre/include/lustre_log.h b/lustre/include/lustre_log.h index 52bc7be..18e9a3a 100644 --- a/lustre/include/lustre_log.h +++ b/lustre/include/lustre_log.h @@ -292,7 +292,7 @@ struct llog_operations { /* In-memory descriptor for a log object or log catalog */ struct llog_handle { struct rw_semaphore lgh_lock; - spinlock_t lgh_hdr_lock; /* protect lgh_hdr data */ + struct rw_semaphore lgh_hdr_lock; /* protect lgh_hdr data */ struct llog_logid lgh_id; /* id of this log */ struct llog_log_hdr *lgh_hdr; struct dt_object *lgh_obj; diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index 1417804..27d56ee 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -41,27 +41,6 @@ struct dt_key; struct dt_rec; struct object_update_param; -struct update_buffer { - struct object_update_request *ub_req; - size_t ub_req_size; -}; - -/** - * Tracking the updates being executed on this dt_device. - */ -struct dt_update_request { - struct dt_device *dur_dt; - /* attached itself to thandle */ - int dur_flags; - /* update request result */ - int dur_rc; - /* Current batch(transaction) id */ - __u64 dur_batchid; - /* Holding object updates */ - struct update_buffer dur_buf; - struct list_head dur_cb_items; -}; - struct update_params { struct object_update_param up_params[0]; }; @@ -116,7 +95,7 @@ update_params_get_param_buf(const struct update_params *params, __u16 index, if (size != NULL) *size = param->oup_len; - return ¶m->oup_buf[0]; + return param->oup_buf; } struct update_op { @@ -360,6 +339,7 @@ struct top_multiple_thandle { /* All of update records will packed here */ struct thandle_update_records *tmt_update_records; + wait_queue_head_t tmt_stop_waitq; __u64 tmt_batchid; int tmt_result; __u32 tmt_magic; @@ -382,23 +362,24 @@ struct top_thandle { struct sub_thandle { struct thandle *st_sub_th; struct dt_device *st_dt; - struct list_head st_list; struct llog_cookie st_cookie; struct dt_txn_commit_cb st_commit_dcb; + struct dt_txn_commit_cb st_stop_dcb; int st_result; /* linked to top_thandle */ struct list_head st_sub_list; /* If this sub thandle is committed */ - bool st_committed:1; + bool st_committed:1, + st_stopped:1; }; struct tx_arg; typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th, struct tx_arg *ta); -/* Structure for holding one update executation */ +/* Structure for holding one update execution */ struct tx_arg { tx_exec_func_t exec_fn; tx_exec_func_t undo_fn; diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 2c288c2..cb80676 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -1732,7 +1732,6 @@ static int check_for_next_transno(struct lu_target *lut) if (lut->lut_tdtd != NULL) { struct target_distribute_txn_data *tdtd; - __u64 update_transno; tdtd = lut->lut_tdtd; update_transno = distribute_txn_get_next_transno(lut->lut_tdtd); @@ -1754,8 +1753,8 @@ static int check_for_next_transno(struct lu_target *lut) } else if (obd->obd_recovery_expired) { CDEBUG(D_HA, "waking for expired recovery\n"); wake_up = 1; - } else if (req_transno == next_transno || (update_transno != 0 && - update_transno <= next_transno)) { + } else if (req_transno == next_transno || + (update_transno != 0 && update_transno <= next_transno)) { CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno); wake_up = 1; } else if (queue_len > 0 && @@ -2263,7 +2262,7 @@ static void replay_request_or_update(struct lu_env *env, obd->obd_next_recovery_transno = transno; spin_unlock(&obd->obd_recovery_task_lock); target_update_lcd(env, lut, dtrq); - dtrq_destory(dtrq); + dtrq_destroy(dtrq); } else { spin_unlock(&obd->obd_recovery_task_lock); LASSERT(list_empty(&obd->obd_req_replay_queue)); diff --git a/lustre/lod/lod_dev.c b/lustre/lod/lod_dev.c index 7c7a1cb..02b41d6 100644 --- a/lustre/lod/lod_dev.c +++ b/lustre/lod/lod_dev.c @@ -316,8 +316,8 @@ static int lod_process_recovery_updates(const struct lu_env *env, if (rec->lrh_len != llog_update_record_size((struct llog_update_record *)rec)) { - CERROR("%s broken update record! index %u "DOSTID":%u : rc = %d\n", - lod2obd(lrd->lrd_lod)->obd_name, index, + CERROR("%s broken update record! index %u "DOSTID":%u :" + " rc = %d\n", lod2obd(lrd->lrd_lod)->obd_name, index, POSTID(&llh->lgh_id.lgl_oi), rec->lrh_index, -EIO); return -EIO; } diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index 4e3ae10..33d2ea3 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -65,7 +65,7 @@ static struct llog_handle *llog_alloc_handle(void) return NULL; init_rwsem(&loghandle->lgh_lock); - spin_lock_init(&loghandle->lgh_hdr_lock); + init_rwsem(&loghandle->lgh_hdr_lock); INIT_LIST_HEAD(&loghandle->u.phd.phd_entry); atomic_set(&loghandle->lgh_refcount, 1); @@ -121,9 +121,9 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle, RETURN(-EINVAL); } - spin_lock(&loghandle->lgh_hdr_lock); + down_write(&loghandle->lgh_hdr_lock); if (!ext2_clear_bit(index, llh->llh_bitmap)) { - spin_unlock(&loghandle->lgh_hdr_lock); + up_write(&loghandle->lgh_hdr_lock); CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index); RETURN(-ENOENT); } @@ -133,7 +133,7 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle, if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) && (llh->llh_count == 1) && (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) { - spin_unlock(&loghandle->lgh_hdr_lock); + up_write(&loghandle->lgh_hdr_lock); rc = llog_destroy(env, loghandle); if (rc < 0) { CERROR("%s: can't destroy empty llog #"DOSTID @@ -145,7 +145,7 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle, } RETURN(LLOG_DEL_PLAIN); } - spin_unlock(&loghandle->lgh_hdr_lock); + up_write(&loghandle->lgh_hdr_lock); rc = llog_write(env, loghandle, &llh->llh_hdr, LLOG_HEADER_IDX); if (rc < 0) { @@ -158,10 +158,10 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle, } RETURN(0); out_err: - spin_lock(&loghandle->lgh_hdr_lock); + down_write(&loghandle->lgh_hdr_lock); ext2_set_bit(index, llh->llh_bitmap); llh->llh_count++; - spin_unlock(&loghandle->lgh_hdr_lock); + up_write(&loghandle->lgh_hdr_lock); return rc; } diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index e81fb9e..ad843e9 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -507,15 +507,24 @@ static int llog_osd_write_rec(const struct lu_env *env, /* the lgh_hdr_lock protects llog header data from concurrent * update/cancel, the llh_count and llh_bitmap are protected */ - spin_lock(&loghandle->lgh_hdr_lock); + down_write(&loghandle->lgh_hdr_lock); if (ext2_set_bit(index, llh->llh_bitmap)) { CERROR("%s: index %u already set in log bitmap\n", o->do_lu.lo_dev->ld_obd->obd_name, index); - spin_unlock(&loghandle->lgh_hdr_lock); + up_write(&loghandle->lgh_hdr_lock); LBUG(); /* should never happen */ } llh->llh_count++; - spin_unlock(&loghandle->lgh_hdr_lock); + + /* XXX It is a bit tricky here, if the log object is local, + * we do not need lock during write here, because if there is + * race, the transaction(jbd2, what about ZFS?) will make sure the + * conflicts will all committed in the same transaction group. + * But for remote object, we need lock the whole process, so to + * set the version of the remote transaction to make sure they + * are being sent in order. (see osp_md_write()) */ + if (!dt_object_remote(o)) + up_write(&loghandle->lgh_hdr_lock); if (lgi->lgi_attr.la_size == 0) { lgi->lgi_off = 0; @@ -523,7 +532,7 @@ static int llog_osd_write_rec(const struct lu_env *env, lgi->lgi_buf.lb_buf = &llh->llh_hdr; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc != 0) - GOTO(out, rc); + GOTO(out_remote_unlock, rc); } else { /* Note: If this is not initialization (size == 0), then do not * write the whole header (8k bytes), only update header/tail @@ -538,7 +547,7 @@ static int llog_osd_write_rec(const struct lu_env *env, lgi->lgi_buf.lb_buf = &llh->llh_count; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc != 0) - GOTO(out, rc); + GOTO(out_remote_unlock, rc); lgi->lgi_off = offsetof(typeof(*llh), llh_bitmap[index / (sizeof(*llh->llh_bitmap) * 8)]); @@ -547,16 +556,23 @@ static int llog_osd_write_rec(const struct lu_env *env, &llh->llh_bitmap[index/(sizeof(*llh->llh_bitmap)*8)]; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc != 0) - GOTO(out, rc); + GOTO(out_remote_unlock, rc); lgi->lgi_off = offsetof(typeof(*llh), llh_tail); lgi->lgi_buf.lb_len = sizeof(llh->llh_tail); lgi->lgi_buf.lb_buf = &llh->llh_tail; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc != 0) - GOTO(out, rc); + GOTO(out_remote_unlock, rc); } +out_remote_unlock: + /* unlock here for remote object */ + if (dt_object_remote(o)) + up_write(&loghandle->lgh_hdr_lock); + if (rc) + GOTO(out, rc); + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) GOTO(out, rc); @@ -586,10 +602,10 @@ static int llog_osd_write_rec(const struct lu_env *env, RETURN(rc); out: /* cleanup llog for error case */ - spin_lock(&loghandle->lgh_hdr_lock); + down_write(&loghandle->lgh_hdr_lock); ext2_clear_bit(index, llh->llh_bitmap); llh->llh_count--; - spin_unlock(&loghandle->lgh_hdr_lock); + up_write(&loghandle->lgh_hdr_lock); /* restore llog last_idx */ loghandle->lgh_last_idx--; diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index 3ceb854..1815db2 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -831,7 +831,8 @@ static void osd_trans_commit_cb(struct super_block *sb, dt_txn_hook_commit(th); /* call per-transaction callbacks if any */ - list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) { + list_for_each_entry_safe(dcb, tmp, &oh->ot_commit_dcb_list, + dcb_linkage) { LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC, "commit callback entry: magic=%x name='%s'\n", dcb->dcb_magic, dcb->dcb_name); @@ -870,7 +871,8 @@ static struct thandle *osd_trans_create(const struct lu_env *env, th->th_result = 0; th->th_tags = LCT_TX_HANDLE; oh->ot_credits = 0; - INIT_LIST_HEAD(&oh->ot_dcb_list); + INIT_LIST_HEAD(&oh->ot_commit_dcb_list); + INIT_LIST_HEAD(&oh->ot_stop_dcb_list); osd_th_alloced(oh); memset(oti->oti_declare_ops, 0, @@ -1006,6 +1008,22 @@ static int osd_seq_exists(const struct lu_env *env, RETURN(ss->ss_node_id == range->lsr_index); } +static void osd_trans_stop_cb(struct osd_thandle *oth, int result) +{ + struct dt_txn_commit_cb *dcb; + struct dt_txn_commit_cb *tmp; + + /* call per-transaction stop callbacks if any */ + list_for_each_entry_safe(dcb, tmp, &oth->ot_stop_dcb_list, + dcb_linkage) { + LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC, + "commit callback entry: magic=%x name='%s'\n", + dcb->dcb_magic, dcb->dcb_name); + list_del_init(&dcb->dcb_linkage); + dcb->dcb_func(NULL, &oth->ot_super, dcb, result); + } +} + /* * Concurrency: shouldn't matter. */ @@ -1045,6 +1063,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, CERROR("%s: failed in transaction hook: rc = %d\n", osd_name(osd), rc); + osd_trans_stop_cb(oh, rc); /* hook functions might modify th_sync */ hdl->h_sync = th->th_sync; @@ -1054,6 +1073,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, CERROR("%s: failed to stop transaction: rc = %d\n", osd_name(osd), rc); } else { + osd_trans_stop_cb(oh, th->th_result); OBD_FREE_PTR(oh); } @@ -1085,7 +1105,10 @@ static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb) LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC); LASSERT(&dcb->dcb_func != NULL); - list_add(&dcb->dcb_linkage, &oh->ot_dcb_list); + if (dcb->dcb_flags & DCB_TRANS_STOP) + list_add(&dcb->dcb_linkage, &oh->ot_stop_dcb_list); + else + list_add(&dcb->dcb_linkage, &oh->ot_commit_dcb_list); return 0; } diff --git a/lustre/osd-ldiskfs/osd_internal.h b/lustre/osd-ldiskfs/osd_internal.h index 2cc5ca6..683deb8 100644 --- a/lustre/osd-ldiskfs/osd_internal.h +++ b/lustre/osd-ldiskfs/osd_internal.h @@ -330,7 +330,8 @@ struct osd_thandle { struct thandle ot_super; handle_t *ot_handle; struct ldiskfs_journal_cb_entry ot_jcb; - struct list_head ot_dcb_list; + struct list_head ot_commit_dcb_list; + struct list_head ot_stop_dcb_list; /* Link to the device, for debugging. */ struct lu_ref_link ot_dev_link; unsigned short ot_credits; diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 73efec3..b9ea13b 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -74,6 +74,7 @@ #define DEBUG_SUBSYSTEM S_MDS +#include #include #include #include @@ -483,6 +484,95 @@ static int osp_disconnect(struct osp_device *d) } /** + * Initialize the osp_update structure in OSP device + * + * Allocate osp update structure and start update thread. + * + * \param[in] osp OSP device + * + * \retval 0 if initialization succeeds. + * \retval negative errno if initialization fails. + */ +static int osp_update_init(struct osp_device *osp) +{ + struct l_wait_info lwi = { 0 }; + struct task_struct *task; + + ENTRY; + + LASSERT(osp->opd_connect_mdt); + + OBD_ALLOC_PTR(osp->opd_update); + if (osp->opd_update == NULL) + RETURN(-ENOMEM); + + init_waitqueue_head(&osp->opd_update_thread.t_ctl_waitq); + init_waitqueue_head(&osp->opd_update->ou_waitq); + spin_lock_init(&osp->opd_update->ou_lock); + INIT_LIST_HEAD(&osp->opd_update->ou_list); + osp->opd_update->ou_rpc_version = 1; + osp->opd_update->ou_version = 1; + + /* start thread handling sending updates to the remote MDT */ + task = kthread_run(osp_send_update_thread, osp, + "osp_up%u-%u", osp->opd_index, osp->opd_group); + if (IS_ERR(task)) { + int rc = PTR_ERR(task); + + OBD_FREE_PTR(osp->opd_update); + osp->opd_update = NULL; + CERROR("%s: can't start precreate thread: rc = %d\n", + osp->opd_obd->obd_name, rc); + RETURN(rc); + } + + l_wait_event(osp->opd_update_thread.t_ctl_waitq, + osp_send_update_thread_running(osp) || + osp_send_update_thread_stopped(osp), &lwi); + + RETURN(0); +} + +/** + * Finialize osp_update structure in OSP device + * + * Stop the OSP update sending thread, then delete the left + * osp thandle in the sending list. + * + * \param [in] osp OSP device. + */ +static void osp_update_fini(const struct lu_env *env, struct osp_device *osp) +{ + struct osp_update_request *our; + struct osp_update_request *tmp; + struct osp_updates *ou = osp->opd_update; + + if (ou == NULL) + return; + + osp->opd_update_thread.t_flags = SVC_STOPPING; + wake_up(&ou->ou_waitq); + + wait_event(osp->opd_update_thread.t_ctl_waitq, + osp->opd_update_thread.t_flags & SVC_STOPPED); + + /* Remove the left osp thandle from the list */ + spin_lock(&ou->ou_lock); + list_for_each_entry_safe(our, tmp, &ou->ou_list, + our_list) { + list_del_init(&our->our_list); + LASSERT(our->our_th != NULL); + osp_trans_callback(env, our->our_th, -EIO); + /* our will be destroyed in osp_thandle_put() */ + osp_thandle_put(our->our_th); + } + spin_unlock(&ou->ou_lock); + + OBD_FREE_PTR(ou); + osp->opd_update = NULL; +} + +/** * Cleanup OSP, which includes disconnect import, cleanup unlink log, stop * precreate threads etc. * @@ -543,6 +633,7 @@ static int osp_process_config(const struct lu_env *env, switch (lcfg->lcfg_command) { case LCFG_PRE_CLEANUP: rc = osp_disconnect(d); + osp_update_fini(env, d); break; case LCFG_CLEANUP: lu_dev_del_linkage(dev->ld_site, dev); @@ -1082,6 +1173,10 @@ static int osp_init0(const struct lu_env *env, struct osp_device *osp, rc = osp_sync_init(env, osp); if (rc < 0) GOTO(out_precreat, rc); + } else { + rc = osp_update_init(osp); + if (rc != 0) + GOTO(out_fid, rc); } ns_register_cancel(obd->obd_namespace, osp_cancel_weight); @@ -1109,6 +1204,8 @@ out_precreat: /* stop precreate thread */ if (!osp->opd_connect_mdt) osp_precreate_fini(osp); + else + osp_update_fini(env, osp); out_last_used: if (!osp->opd_connect_mdt) osp_last_used_fini(env, osp); @@ -1222,7 +1319,7 @@ static struct lu_device *osp_device_fini(const struct lu_env *env, ENTRY; if (osp->opd_async_requests != NULL) { - dt_update_request_destroy(osp->opd_async_requests); + osp_update_request_destroy(osp->opd_async_requests); osp->opd_async_requests = NULL; } diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index 4198f45..d6cf452 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -94,6 +94,36 @@ struct osp_precreate { int osp_pre_recovering; }; +/** + * Tracking the updates being executed on this dt_device. + */ +struct osp_update_request { + int our_flags; + /* update request result */ + int our_rc; + + /* Holding object updates sent to the remote target */ + struct object_update_request *our_req; + size_t our_req_size; + + struct list_head our_cb_items; + + /* points to thandle if this update request belongs to one */ + struct osp_thandle *our_th; + /* linked to the list(ou_list) in osp_updates */ + struct list_head our_list; + __u32 our_req_sent:1; +}; + +struct osp_updates { + struct list_head ou_list; + spinlock_t ou_lock; + wait_queue_head_t ou_waitq; + /* wait for next updates */ + __u64 ou_rpc_version; + __u64 ou_version; +}; + struct osp_device { struct dt_device opd_dt_dev; /* corresponded OST index */ @@ -141,6 +171,11 @@ struct osp_device { /* thread waits for signals about pool going empty */ wait_queue_head_t opd_pre_waitq; + /* send update thread */ + struct osp_updates *opd_update; + /* dedicate update thread */ + struct ptlrpc_thread opd_update_thread; + /* * OST synchronization */ @@ -196,7 +231,7 @@ struct osp_device { * osp_device::opd_async_requests via declare() functions, these * requests can be packed together and sent to the remote server * via single OUT RPC later. */ - struct dt_update_request *opd_async_requests; + struct osp_update_request *opd_async_requests; /* Protect current operations on opd_async_requests. */ struct mutex opd_async_requests_mutex; struct list_head opd_async_updates; @@ -296,14 +331,18 @@ struct osp_it { struct page **ooi_pages; }; +#define OSP_THANDLE_MAGIC 0x20141214 struct osp_thandle { struct thandle ot_super; - struct dt_update_request *ot_dur; /* OSP will use this thandle to update last oid*/ struct thandle *ot_storage_th; - struct list_head ot_dcb_list; + __u32 ot_magic; + struct list_head ot_commit_dcb_list; + struct list_head ot_stop_dcb_list; + struct osp_update_request *ot_our; atomic_t ot_refcount; + __u64 ot_version; }; static inline struct osp_thandle * @@ -312,13 +351,13 @@ thandle_to_osp_thandle(struct thandle *th) return container_of(th, struct osp_thandle, ot_super); } -static inline struct dt_update_request * -thandle_to_dt_update_request(struct thandle *th) +static inline struct osp_update_request * +thandle_to_osp_update_request(struct thandle *th) { struct osp_thandle *oth; oth = thandle_to_osp_thandle(th); - return oth->ot_dur; + return oth->ot_our; } /* The transaction only include the updates on the remote node, and @@ -542,7 +581,7 @@ update_buffer_get_update(struct object_update_request *request, unsigned int index); int osp_extend_update_buffer(const struct lu_env *env, - struct update_buffer *ubuf); + struct osp_update_request *our); #define osp_update_rpc_pack(env, name, update, op, ...) \ ({ \ @@ -552,25 +591,27 @@ int osp_extend_update_buffer(const struct lu_env *env, int ret; \ \ while (1) { \ - ureq = update->dur_buf.ub_req; \ - max_update_length = update->dur_buf.ub_req_size - \ + ureq = update->our_req; \ + max_update_length = update->our_req_size - \ object_update_request_size(ureq); \ \ object_update = update_buffer_get_update(ureq, \ - ureq->ourq_count); \ - ret = out_##name##_pack(env, object_update, max_update_length, \ + ureq->ourq_count); \ + ret = out_##name##_pack(env, object_update, \ + max_update_length, \ __VA_ARGS__); \ if (ret == -E2BIG) { \ int rc1; \ /* extend the buffer and retry */ \ - rc1 = osp_extend_update_buffer(env, &update->dur_buf); \ + rc1 = osp_extend_update_buffer(env, update); \ if (rc1 != 0) { \ ret = rc1; \ break; \ } \ } else { \ if (ret == 0) { \ - object_update->ou_flags |= update->dur_flags; \ + object_update->ou_flags |= \ + update->our_flags; \ ureq->ourq_count++; \ } \ break; \ @@ -579,6 +620,16 @@ int osp_extend_update_buffer(const struct lu_env *env, ret; \ }) +static inline bool osp_send_update_thread_running(struct osp_device *osp) +{ + return osp->opd_update_thread.t_flags & SVC_RUNNING; +} + +static inline bool osp_send_update_thread_stopped(struct osp_device *osp) +{ + return osp->opd_update_thread.t_flags & SVC_STOPPED; +} + typedef int (*osp_update_interpreter_t)(const struct lu_env *env, struct object_update_reply *rep, struct ptlrpc_request *req, @@ -597,32 +648,47 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op, int osp_unplug_async_request(const struct lu_env *env, struct osp_device *osp, - struct dt_update_request *update); + struct osp_update_request *update); int osp_trans_update_request_create(struct thandle *th); struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d); int osp_trans_start(const struct lu_env *env, struct dt_device *dt, struct thandle *th); int osp_insert_update_callback(const struct lu_env *env, - struct dt_update_request *update, + struct osp_update_request *update, struct osp_object *obj, void *data, osp_update_interpreter_t interpreter); -int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, - const struct object_update_request *ureq, - struct ptlrpc_request **reqp); -struct dt_update_request *dt_update_request_create(struct dt_device *dt); -void dt_update_request_destroy(struct dt_update_request *dt_update); + +struct osp_update_request *osp_update_request_create(struct dt_device *dt); +void osp_update_request_destroy(struct osp_update_request *update); + +int osp_send_update_thread(void *arg); +int osp_check_and_set_rpc_version(struct osp_thandle *oth); + +void osp_thandle_destroy(struct osp_thandle *oth); +static inline void osp_thandle_get(struct osp_thandle *oth) +{ + atomic_inc(&oth->ot_refcount); +} + +static inline void osp_thandle_put(struct osp_thandle *oth) +{ + if (atomic_dec_and_test(&oth->ot_refcount)) + osp_thandle_destroy(oth); +} int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, const struct object_update_request *ureq, struct ptlrpc_request **reqp); int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, - struct dt_update_request *update, + struct osp_update_request *update, struct ptlrpc_request **reqp); struct thandle *osp_get_storage_thandle(const struct lu_env *env, struct thandle *th, struct osp_device *osp); +void osp_trans_callback(const struct lu_env *env, + struct osp_thandle *oth, int rc); /* osp_object.c */ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr); diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 3acbb16..46de68b 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -104,7 +104,7 @@ static int osp_object_create_interpreter(const struct lu_env *env, /** * Implementation of dt_object_operations::do_declare_create * - * Create the dt_update_request to track the update for this OSP + * Create the osp_update_request to track the update for this OSP * in the transaction. * * \param[in] env execution environment @@ -154,25 +154,25 @@ update_buffer_get_update(struct object_update_request *request, } int osp_extend_update_buffer(const struct lu_env *env, - struct update_buffer *ubuf) + struct osp_update_request *our) { - struct object_update_request *ureq; - size_t new_size = ubuf->ub_req_size + OUT_UPDATE_BUFFER_SIZE_ADD; + struct object_update_request *obj_update_req; + size_t new_size = our->our_req_size + OUT_UPDATE_BUFFER_SIZE_ADD; /* enlarge object update request size */ if (new_size > OUT_UPDATE_BUFFER_SIZE_MAX) return -E2BIG; - OBD_ALLOC_LARGE(ureq, new_size); - if (ureq == NULL) + OBD_ALLOC_LARGE(obj_update_req, new_size); + if (obj_update_req == NULL) return -ENOMEM; - memcpy(ureq, ubuf->ub_req, ubuf->ub_req_size); + memcpy(obj_update_req, our->our_req, our->our_req_size); - OBD_FREE_LARGE(ubuf->ub_req, ubuf->ub_req_size); + OBD_FREE_LARGE(our->our_req, our->our_req_size); - ubuf->ub_req = ureq; - ubuf->ub_req_size = new_size; + our->our_req = obj_update_req; + our->our_req_size = new_size; return 0; } @@ -197,11 +197,11 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th) { - struct dt_update_request *update; + struct osp_update_request *update; struct osp_object *obj = dt2osp_obj(dt); int rc; - update = thandle_to_dt_update_request(th); + update = thandle_to_osp_update_request(th); LASSERT(update != NULL); LASSERT(attr->la_valid & LA_TYPE); @@ -228,7 +228,7 @@ out: /** * Implementation of dt_object_operations::do_declare_ref_del * - * Create the dt_update_request to track the update for this OSP + * Create the osp_update_request to track the update for this OSP * in the transaction. * * \param[in] env execution environment @@ -260,10 +260,10 @@ static int osp_md_declare_ref_del(const struct lu_env *env, static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - struct dt_update_request *update; + struct osp_update_request *update; int rc; - update = thandle_to_dt_update_request(th); + update = thandle_to_osp_update_request(th); LASSERT(update != NULL); rc = osp_update_rpc_pack(env, ref_del, update, OUT_REF_DEL, @@ -274,7 +274,7 @@ static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt, /** * Implementation of dt_object_operations::do_declare_ref_del * - * Create the dt_update_request to track the update for this OSP + * Create the osp_update_request to track the update for this OSP * in the transaction. * * \param[in] env execution environment @@ -306,10 +306,10 @@ static int osp_md_declare_ref_add(const struct lu_env *env, static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - struct dt_update_request *update; + struct osp_update_request *update; int rc; - update = thandle_to_dt_update_request(th); + update = thandle_to_osp_update_request(th); LASSERT(update != NULL); rc = osp_update_rpc_pack(env, ref_add, update, OUT_REF_ADD, @@ -346,7 +346,7 @@ static void osp_md_ah_init(const struct lu_env *env, /** * Implementation of dt_object_operations::do_declare_attr_get * - * Create the dt_update_request to track the update for this OSP + * Create the osp_update_request to track the update for this OSP * in the transaction. * * \param[in] env execution environment @@ -382,10 +382,10 @@ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, struct thandle *th) { - struct dt_update_request *update; + struct osp_update_request *update; int rc; - update = thandle_to_dt_update_request(th); + update = thandle_to_osp_update_request(th); LASSERT(update != NULL); rc = osp_update_rpc_pack(env, attr_set, update, OUT_ATTR_SET, @@ -506,7 +506,7 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); struct dt_device *dt_dev = &osp->opd_dt_dev; - struct dt_update_request *update; + struct osp_update_request *update; struct object_update_reply *reply; struct ptlrpc_request *req = NULL; struct lu_fid *fid; @@ -517,7 +517,7 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, * just create an update buffer, instead of attaching the * update_remote list of the thandle. */ - update = dt_update_request_create(dt_dev); + update = osp_update_request_create(dt_dev); if (IS_ERR(update)) RETURN(PTR_ERR(update)); @@ -573,7 +573,7 @@ out: if (req != NULL) ptlrpc_req_finished(req); - dt_update_request_destroy(update); + osp_update_request_destroy(update); return rc; } @@ -581,7 +581,7 @@ out: /** * Implementation of dt_index_operations::dio_declare_insert * - * Create the dt_update_request to track the update for this OSP + * Create the osp_update_request to track the update for this OSP * in the transaction. * * \param[in] env execution environment @@ -625,10 +625,12 @@ static int osp_md_index_insert(const struct lu_env *env, struct thandle *th, int ignore_quota) { - struct osp_thandle *oth = thandle_to_osp_thandle(th); - struct dt_update_request *update = oth->ot_dur; + struct osp_update_request *update; int rc; + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + rc = osp_update_rpc_pack(env, index_insert, update, OUT_INDEX_INSERT, lu_object_fid(&dt->do_lu), rec, key); return rc; @@ -637,7 +639,7 @@ static int osp_md_index_insert(const struct lu_env *env, /** * Implementation of dt_index_operations::dio_declare_delete * - * Create the dt_update_request to track the update for this OSP + * Create the osp_update_request to track the update for this OSP * in the transaction. * * \param[in] env execution environment @@ -675,10 +677,10 @@ static int osp_md_index_delete(const struct lu_env *env, const struct dt_key *key, struct thandle *th) { - struct dt_update_request *update; + struct osp_update_request *update; int rc; - update = thandle_to_dt_update_request(th); + update = thandle_to_osp_update_request(th); LASSERT(update != NULL); rc = osp_update_rpc_pack(env, index_delete, update, OUT_INDEX_DELETE, @@ -1026,14 +1028,14 @@ int osp_md_object_destroy(const struct lu_env *env, struct dt_object *dt, { struct osp_object *o = dt2osp_obj(dt); struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); - struct dt_update_request *update; + struct osp_update_request *update; int rc = 0; ENTRY; o->opo_non_exist = 1; LASSERT(osp->opd_connect_mdt); - update = thandle_to_dt_update_request(th); + update = thandle_to_osp_update_request(th); LASSERT(update != NULL); rc = osp_update_rpc_pack(env, object_destroy, update, OUT_DESTROY, @@ -1077,7 +1079,7 @@ struct dt_object_operations osp_md_obj_ops = { /** * Implementation of dt_body_operations::dbo_declare_write * - * Create the dt_update_request to track the update for this OSP + * Create the osp_update_request to track the update for this OSP * in the transaction. * * \param[in] env execution environment @@ -1117,12 +1119,13 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, struct thandle *th, int ignore_quota) { - struct osp_object *obj = dt2osp_obj(dt); - struct dt_update_request *update; + struct osp_object *obj = dt2osp_obj(dt); + struct osp_update_request *update; + struct osp_thandle *oth = thandle_to_osp_thandle(th); ssize_t rc; ENTRY; - update = thandle_to_dt_update_request(th); + update = thandle_to_osp_update_request(th); LASSERT(update != NULL); rc = osp_update_rpc_pack(env, write, update, OUT_WRITE, @@ -1141,6 +1144,10 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, obj->opo_ooa->ooa_attr.la_size < *pos) obj->opo_ooa->ooa_attr.la_size = *pos; + rc = osp_check_and_set_rpc_version(oth); + if (rc < 0) + RETURN(rc); + RETURN(buf->lb_len); } @@ -1150,7 +1157,7 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt, struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); struct dt_device *dt_dev = &osp->opd_dt_dev; struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; - struct dt_update_request *update; + struct osp_update_request *update; struct object_update_reply *reply; struct out_read_reply *orr; struct ptlrpc_request *req = NULL; @@ -1159,8 +1166,8 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt, /* Because it needs send the update buffer right away, * just create an update buffer, instead of attaching the - * update_remote list of the thandle. */ - update = dt_update_request_create(dt_dev); + * update_remote list of the thandle. */ + update = osp_update_request_create(dt_dev); if (IS_ERR(update)) RETURN(PTR_ERR(update)); @@ -1211,7 +1218,7 @@ out: if (req != NULL) ptlrpc_req_finished(req); - dt_update_request_destroy(update); + osp_update_request_destroy(update); return rc; } diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index bd51a8e..ff6107e 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -541,7 +541,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); struct osp_object *obj = dt2osp_obj(dt); struct dt_device *dev = &osp->opd_dt_dev; - struct dt_update_request *update; + struct osp_update_request *update; struct object_update_reply *reply; struct ptlrpc_request *req = NULL; int rc = 0; @@ -561,7 +561,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, spin_unlock(&obj->opo_lock); } - update = dt_update_request_create(dev); + update = osp_update_request_create(dev); if (IS_ERR(update)) RETURN(PTR_ERR(update)); @@ -607,7 +607,7 @@ out: if (req != NULL) ptlrpc_req_finished(req); - dt_update_request_destroy(update); + osp_update_request_destroy(update); return rc; } @@ -857,7 +857,7 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, mutex_unlock(&osp->opd_async_requests_mutex); osp_oac_xattr_put(oxe); } else { - struct dt_update_request *update; + struct osp_update_request *update; /* XXX: Currently, we trigger the batched async OUT * RPC via dt_declare_xattr_get(). It is not @@ -865,8 +865,8 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, * * We will improve it in the future. */ update = osp->opd_async_requests; - if (update != NULL && update->dur_buf.ub_req != NULL && - update->dur_buf.ub_req->ourq_count > 0) { + if (update != NULL && update->our_req != NULL && + update->our_req->ourq_count > 0) { osp->opd_async_requests = NULL; mutex_unlock(&osp->opd_async_requests_mutex); rc = osp_unplug_async_request(env, osp, update); @@ -908,7 +908,7 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt, struct osp_object *obj = dt2osp_obj(dt); struct dt_device *dev = &osp->opd_dt_dev; struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2; - struct dt_update_request *update = NULL; + struct osp_update_request *update = NULL; struct ptlrpc_request *req = NULL; struct object_update_reply *reply; struct osp_xattr_entry *oxe = NULL; @@ -959,7 +959,7 @@ unlock: spin_unlock(&obj->opo_lock); } - update = dt_update_request_create(dev); + update = osp_update_request_create(dev); if (IS_ERR(update)) GOTO(out, rc = PTR_ERR(update)); @@ -1077,7 +1077,7 @@ out: ptlrpc_req_finished(req); if (update != NULL && !IS_ERR(update)) - dt_update_request_destroy(update); + osp_update_request_destroy(update); if (oxe != NULL) osp_oac_xattr_put(oxe); @@ -1141,12 +1141,12 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { struct osp_object *o = dt2osp_obj(dt); - struct dt_update_request *update; + struct osp_update_request *update; struct osp_xattr_entry *oxe; int rc; ENTRY; - update = thandle_to_dt_update_request(th); + update = thandle_to_osp_update_request(th); LASSERT(update != NULL); CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n", @@ -1245,13 +1245,13 @@ int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - struct dt_update_request *update; + struct osp_update_request *update; const struct lu_fid *fid = lu_object_fid(&dt->do_lu); - struct osp_object *o = dt2osp_obj(dt); + struct osp_object *o = dt2osp_obj(dt); struct osp_xattr_entry *oxe; int rc; - update = thandle_to_dt_update_request(th); + update = thandle_to_osp_update_request(th); LASSERT(update != NULL); rc = osp_update_rpc_pack(env, xattr_del, update, OUT_XATTR_DEL, diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index b6e5cf4..6d0067a 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -72,7 +72,7 @@ * The argument for the interpreter callback of osp request. */ struct osp_update_args { - struct dt_update_request *oaua_update; + struct osp_update_request *oaua_update; atomic_t *oaua_count; wait_queue_head_t *oaua_waitq; bool oaua_flow_control; @@ -82,7 +82,7 @@ struct osp_update_args { * Call back for each update request. */ struct osp_update_callback { - /* list in the dt_update_request::dur_cb_items */ + /* list in the osp_update_request::our_cb_items */ struct list_head ouc_list; /* The target of the async update request. */ @@ -117,55 +117,49 @@ static void object_update_request_free(struct object_update_request *ourq, } /** - * Allocate and initialize dt_update_request + * Allocate and initialize osp_update_request * - * dt_update_request is being used to track updates being executed on + * osp_update_request is being used to track updates being executed on * this dt_device(OSD or OSP). The update buffer will be 4k initially, * and increased if needed. * * \param [in] dt dt device * - * \retval dt_update_request being allocated if succeed + * \retval osp_update_request being allocated if succeed * \retval ERR_PTR(errno) if failed */ -struct dt_update_request *dt_update_request_create(struct dt_device *dt) +struct osp_update_request *osp_update_request_create(struct dt_device *dt) { - struct dt_update_request *dt_update; + struct osp_update_request *osp_update_req; struct object_update_request *ourq; - OBD_ALLOC_PTR(dt_update); - if (dt_update == NULL) + OBD_ALLOC_PTR(osp_update_req); + if (osp_update_req == NULL) return ERR_PTR(-ENOMEM); ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE); if (IS_ERR(ourq)) { - OBD_FREE_PTR(dt_update); + OBD_FREE_PTR(osp_update_req); return ERR_CAST(ourq); } - dt_update->dur_buf.ub_req = ourq; - dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE; + osp_update_req->our_req = ourq; + osp_update_req->our_req_size = OUT_UPDATE_INIT_BUFFER_SIZE; - dt_update->dur_dt = dt; - dt_update->dur_batchid = 0; - INIT_LIST_HEAD(&dt_update->dur_cb_items); + INIT_LIST_HEAD(&osp_update_req->our_cb_items); + INIT_LIST_HEAD(&osp_update_req->our_list); - return dt_update; + return osp_update_req; } -/** - * Destroy dt_update_request - * - * \param [in] dt_update dt_update_request being destroyed - */ -void dt_update_request_destroy(struct dt_update_request *dt_update) +void osp_update_request_destroy(struct osp_update_request *our) { - if (dt_update == NULL) + if (our == NULL) return; - object_update_request_free(dt_update->dur_buf.ub_req, - dt_update->dur_buf.ub_req_size); - OBD_FREE_PTR(dt_update); + object_update_request_free(our->our_req, + our->our_req_size); + OBD_FREE_PTR(our); } static void @@ -195,6 +189,22 @@ object_update_request_dump(const struct object_update_request *ourq, ourq->ourq_magic, ourq->ourq_count, total_size); } +static void osp_trans_stop_cb(struct osp_thandle *oth, int result) +{ + struct dt_txn_commit_cb *dcb; + struct dt_txn_commit_cb *tmp; + + /* call per-transaction stop callbacks if any */ + list_for_each_entry_safe(dcb, tmp, &oth->ot_stop_dcb_list, + dcb_linkage) { + LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC, + "commit callback entry: magic=%x name='%s'\n", + dcb->dcb_magic, dcb->dcb_name); + list_del_init(&dcb->dcb_linkage); + dcb->dcb_func(NULL, &oth->ot_super, dcb, result); + } +} + /** * Allocate an osp request and initialize it with the given parameters. * @@ -258,16 +268,28 @@ static int osp_update_interpret(const struct lu_env *env, { struct object_update_reply *reply = NULL; struct osp_update_args *oaua = arg; - struct dt_update_request *dt_update = oaua->oaua_update; + struct osp_update_request *our = oaua->oaua_update; + struct osp_thandle *oth; struct osp_update_callback *ouc; struct osp_update_callback *next; int count = 0; int index = 0; int rc1 = 0; - if (oaua->oaua_flow_control) - obd_put_request_slot( - &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli); + ENTRY; + + if (our == NULL) + RETURN(0); + + oaua->oaua_update = NULL; + oth = our->our_th; + if (oaua->oaua_flow_control) { + struct osp_device *osp; + + LASSERT(oth != NULL); + osp = dt2osp_dev(oth->ot_super.th_dev); + obd_put_request_slot(&osp->opd_obd->u.cli); + } /* Unpack the results from the reply message. */ if (req->rq_repmsg != NULL) { @@ -282,8 +304,7 @@ static int osp_update_interpret(const struct lu_env *env, rc1 = rc; } - list_for_each_entry_safe(ouc, next, &dt_update->dur_cb_items, - ouc_list) { + list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) { list_del_init(&ouc->ouc_list); /* The peer may only have handled some requests (indicated @@ -314,9 +335,16 @@ static int osp_update_interpret(const struct lu_env *env, if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count)) wake_up_all(oaua->oaua_waitq); - dt_update_request_destroy(dt_update); + if (oth != NULL) { + /* oth and osp_update_requests will be destoryed in + * osp_thandle_put */ + osp_trans_stop_cb(oth, rc); + osp_thandle_put(oth); + } else { + osp_update_request_destroy(our); + } - return 0; + RETURN(0); } /** @@ -325,27 +353,27 @@ static int osp_update_interpret(const struct lu_env *env, * * \param[in] env pointer to the thread context * \param[in] osp pointer to the OSP device - * \param[in] update pointer to the shared queue + * \param[in] our pointer to the shared queue * * \retval 0 for success * \retval negative error number on failure */ int osp_unplug_async_request(const struct lu_env *env, struct osp_device *osp, - struct dt_update_request *update) + struct osp_update_request *our) { struct osp_update_args *args; struct ptlrpc_request *req = NULL; int rc; rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, - update->dur_buf.ub_req, &req); + our->our_req, &req); if (rc != 0) { struct osp_update_callback *ouc; struct osp_update_callback *next; list_for_each_entry_safe(ouc, next, - &update->dur_cb_items, ouc_list) { + &our->our_cb_items, ouc_list) { list_del_init(&ouc->ouc_list); if (ouc->ouc_interpreter != NULL) ouc->ouc_interpreter(env, NULL, NULL, @@ -353,10 +381,10 @@ int osp_unplug_async_request(const struct lu_env *env, ouc->ouc_data, 0, rc); osp_update_callback_fini(env, ouc); } - dt_update_request_destroy(update); + osp_update_request_destroy(our); } else { args = ptlrpc_req_async_args(req); - args->oaua_update = update; + args->oaua_update = our; args->oaua_count = NULL; args->oaua_waitq = NULL; args->oaua_flow_control = false; @@ -372,33 +400,35 @@ int osp_unplug_async_request(const struct lu_env *env, * request queue - osp_device::opd_async_requests. * * If the osp_device::opd_async_requests is not NULL, then return it directly; - * otherwise create new dt_update_request and attach it to opd_async_requests. + * otherwise create new osp_update_request and attach it to opd_async_requests. * * \param[in] osp pointer to the OSP device * * \retval pointer to the shared queue * \retval negative error number on failure */ -static struct dt_update_request * +static struct osp_update_request * osp_find_or_create_async_update_request(struct osp_device *osp) { - struct dt_update_request *update = osp->opd_async_requests; + struct osp_update_request *our = osp->opd_async_requests; + + if (our != NULL) + return our; - if (update != NULL) - return update; + our = osp_update_request_create(&osp->opd_dt_dev); + if (IS_ERR(our)) + return our; - update = dt_update_request_create(&osp->opd_dt_dev); - if (!IS_ERR(update)) - osp->opd_async_requests = update; + osp->opd_async_requests = our; - return update; + return our; } /** - * Insert an osp_update_callback into the dt_update_request. + * Insert an osp_update_callback into the osp_update_request. * - * Insert an osp_update_callback to the dt_update_request. Usually each update - * in the dt_update_request will have one correspondent callback, and these + * Insert an osp_update_callback to the osp_update_request. Usually each update + * in the osp_update_request will have one correspondent callback, and these * callbacks will be called in rq_interpret_reply. * * \param[in] env pointer to the thread context @@ -410,7 +440,7 @@ osp_find_or_create_async_update_request(struct osp_device *osp) * \retval negative error number on failure */ int osp_insert_update_callback(const struct lu_env *env, - struct dt_update_request *update, + struct osp_update_request *our, struct osp_object *obj, void *data, osp_update_interpreter_t interpreter) { @@ -420,7 +450,7 @@ int osp_insert_update_callback(const struct lu_env *env, if (ouc == NULL) RETURN(-ENOMEM); - list_add_tail(&ouc->ouc_list, &update->dur_cb_items); + list_add_tail(&ouc->ouc_list, &our->our_cb_items); return 0; } @@ -454,22 +484,22 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op, __u16 *lens, const void **bufs, void *data, osp_update_interpreter_t interpreter) { - struct osp_device *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev); - struct dt_update_request *update; + struct osp_device *osp; + struct osp_update_request *our; struct object_update *object_update; size_t max_update_size; struct object_update_request *ureq; int rc = 0; ENTRY; - update = osp_find_or_create_async_update_request(osp); - if (IS_ERR(update)) - RETURN(PTR_ERR(update)); + osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev); + our = osp_find_or_create_async_update_request(osp); + if (IS_ERR(our)) + RETURN(PTR_ERR(our)); again: - ureq = update->dur_buf.ub_req; - max_update_size = update->dur_buf.ub_req_size - - object_update_request_size(ureq); + ureq = our->our_req; + max_update_size = our->our_req_size - object_update_request_size(ureq); object_update = update_buffer_get_update(ureq, ureq->ourq_count); rc = out_update_pack(env, object_update, max_update_size, op, @@ -479,14 +509,14 @@ again: osp->opd_async_requests = NULL; mutex_unlock(&osp->opd_async_requests_mutex); - rc = osp_unplug_async_request(env, osp, update); + rc = osp_unplug_async_request(env, osp, our); mutex_lock(&osp->opd_async_requests_mutex); if (rc != 0) RETURN(rc); - update = osp_find_or_create_async_update_request(osp); - if (IS_ERR(update)) - RETURN(PTR_ERR(update)); + our = osp_find_or_create_async_update_request(osp); + if (IS_ERR(our)) + RETURN(PTR_ERR(our)); goto again; } else { @@ -496,7 +526,7 @@ again: ureq->ourq_count++; } - rc = osp_insert_update_callback(env, update, obj, data, interpreter); + rc = osp_insert_update_callback(env, our, obj, data, interpreter); RETURN(rc); } @@ -504,33 +534,33 @@ again: int osp_trans_update_request_create(struct thandle *th) { struct osp_thandle *oth = thandle_to_osp_thandle(th); - struct dt_update_request *update; + struct osp_update_request *our; - if (oth->ot_dur != NULL) + if (oth->ot_our != NULL) return 0; - update = dt_update_request_create(th->th_dev); - if (IS_ERR(update)) { - th->th_result = PTR_ERR(update); - return PTR_ERR(update); + our = osp_update_request_create(th->th_dev); + if (IS_ERR(our)) { + th->th_result = PTR_ERR(our); + return PTR_ERR(our); } if (dt2osp_dev(th->th_dev)->opd_connect_mdt) - update->dur_flags = UPDATE_FL_SYNC; + our->our_flags = UPDATE_FL_SYNC; - oth->ot_dur = update; + oth->ot_our = our; + our->our_th = oth; return 0; } -static void osp_thandle_get(struct osp_thandle *oth) -{ - atomic_inc(&oth->ot_refcount); -} - -static void osp_thandle_put(struct osp_thandle *oth) +void osp_thandle_destroy(struct osp_thandle *oth) { - if (atomic_dec_and_test(&oth->ot_refcount)) - OBD_FREE_PTR(oth); + LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC); + LASSERT(list_empty(&oth->ot_commit_dcb_list)); + LASSERT(list_empty(&oth->ot_stop_dcb_list)); + if (oth->ot_our != NULL) + osp_update_request_destroy(oth->ot_our); + OBD_FREE_PTR(oth); } /** @@ -570,12 +600,14 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d) if (unlikely(oth == NULL)) RETURN(ERR_PTR(-ENOMEM)); + oth->ot_magic = OSP_THANDLE_MAGIC; th = &oth->ot_super; th->th_dev = d; th->th_tags = LCT_TX_HANDLE; atomic_set(&oth->ot_refcount, 1); - INIT_LIST_HEAD(&oth->ot_dcb_list); + INIT_LIST_HEAD(&oth->ot_commit_dcb_list); + INIT_LIST_HEAD(&oth->ot_stop_dcb_list); RETURN(th); } @@ -586,22 +618,22 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d) * Prepare OUT update ptlrpc request, and the request usually includes * all of updates (stored in \param ureq) from one operation. * - * \param[in] env execution environment - * \param[in] imp import on which ptlrpc request will be sent - * \param[in] ureq hold all of updates which will be packed into the req - * \param[in] reqp request to be created + * \param[in] env execution environment + * \param[in] imp import on which ptlrpc request will be sent + * \param[in] ureq hold all of updates which will be packed into the req + * \param[in] reqp request to be created * - * \retval 0 if preparation succeeds. - * \retval negative errno if preparation fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, const struct object_update_request *ureq, struct ptlrpc_request **reqp) { - struct ptlrpc_request *req; - struct object_update_request *tmp; - int ureq_len; - int rc; + struct ptlrpc_request *req; + struct object_update_request *tmp; + size_t ureq_len; + int rc; ENTRY; object_update_request_dump(ureq, D_INFO); @@ -620,7 +652,7 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, } req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY, - RCL_SERVER, OUT_UPDATE_REPLY_SIZE); + RCL_SERVER, OUT_UPDATE_REPLY_SIZE); tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE); memcpy(tmp, ureq, ureq_len); @@ -634,20 +666,21 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, } /** -* Send update RPC. -* -* Send update request to the remote MDT synchronously. -* -* \param[in] env execution environment -* \param[in] imp import on which ptlrpc request will be sent -* \param[in] dt_update hold all of updates which will be packed into the req -* \param[in] reqp request to be created -* -* \retval 0 if RPC succeeds. -* \retval negative errno if RPC fails. -*/ + * Send update RPC. + * + * Send update request to the remote MDT synchronously. + * + * \param[in] env execution environment + * \param[in] imp import on which ptlrpc request will be sent + * \param[in] our hold all of updates which will be packed into the req + * \param[in] reqp request to be created + * + * \retval 0 if RPC succeeds. + * \retval negative errno if RPC fails. + */ + int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, - struct dt_update_request *dt_update, + struct osp_update_request *our, struct ptlrpc_request **reqp) { struct obd_import *imp = osp->opd_obd->u.cli.cl_import; @@ -655,7 +688,7 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, int rc; ENTRY; - rc = osp_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req); + rc = osp_prep_update_req(env, imp, our->our_req, &req); if (rc != 0) RETURN(rc); @@ -669,7 +702,7 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, rc = ptlrpc_queue_wait(req); if (rc < 0) { ptlrpc_req_finished(req); - dt_update->dur_rc = rc; + our->our_rc = rc; RETURN(rc); } @@ -678,7 +711,7 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, RETURN(rc); } - dt_update->dur_rc = rc; + our->our_rc = rc; ptlrpc_req_finished(req); @@ -702,8 +735,10 @@ int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb) LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC); LASSERT(&dcb->dcb_func != NULL); - list_add(&dcb->dcb_linkage, &oth->ot_dcb_list); - + if (dcb->dcb_flags & DCB_TRANS_STOP) + list_add(&dcb->dcb_linkage, &oth->ot_stop_dcb_list); + else + list_add(&dcb->dcb_linkage, &oth->ot_commit_dcb_list); return 0; } @@ -714,7 +749,7 @@ static void osp_trans_commit_cb(struct osp_thandle *oth, int result) LASSERT(atomic_read(&oth->ot_refcount) > 0); /* call per-transaction callbacks if any */ - list_for_each_entry_safe(dcb, tmp, &oth->ot_dcb_list, + list_for_each_entry_safe(dcb, tmp, &oth->ot_commit_dcb_list, dcb_linkage) { LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC, "commit callback entry: magic=%x name='%s'\n", @@ -726,12 +761,16 @@ static void osp_trans_commit_cb(struct osp_thandle *oth, int result) static void osp_request_commit_cb(struct ptlrpc_request *req) { - struct thandle *th = req->rq_cb_data; - struct osp_thandle *oth = thandle_to_osp_thandle(th); + struct thandle *th = req->rq_cb_data; + struct osp_thandle *oth; __u64 last_committed_transno = 0; int result = req->rq_status; ENTRY; + if (th == NULL) + RETURN_EXIT; + + oth = thandle_to_osp_thandle(th); if (lustre_msg_get_last_committed(req->rq_repmsg)) last_committed_transno = lustre_msg_get_last_committed(req->rq_repmsg); @@ -756,43 +795,91 @@ static void osp_request_commit_cb(struct ptlrpc_request *req) } /** - * Trigger the request for remote updates. + * callback of osp transaction * - * If th_sync is set, then the request will be sent synchronously, - * otherwise, the RPC will be sent asynchronously. + * Call all of callbacks for this osp thandle. This will only be + * called in error handler path. In the normal processing path, + * these callback will be called in osp_request_commit_cb() and + * osp_update_interpret(). + * + * \param [in] env execution environment + * \param [in] oth osp thandle + * \param [in] rc result of the osp thandle + */ +void osp_trans_callback(const struct lu_env *env, + struct osp_thandle *oth, int rc) +{ + struct osp_update_callback *ouc; + struct osp_update_callback *next; + + if (oth->ot_our != NULL) { + list_for_each_entry_safe(ouc, next, + &oth->ot_our->our_cb_items, ouc_list) { + list_del_init(&ouc->ouc_list); + if (ouc->ouc_interpreter != NULL) + ouc->ouc_interpreter(env, NULL, NULL, + ouc->ouc_obj, + ouc->ouc_data, 0, rc); + osp_update_callback_fini(env, ouc); + } + } + osp_trans_stop_cb(oth, rc); + osp_trans_commit_cb(oth, rc); +} + +/** + * Send the request for remote updates. + * + * Send updates to the remote MDT. Prepare the request by osp_update_req + * and send them to remote MDT, for sync request, it will wait + * until the reply return, otherwise hand it to ptlrpcd. * * Please refer to osp_trans_create() for transaction type. * * \param[in] env pointer to the thread context * \param[in] osp pointer to the OSP device - * \param[in] dt_update pointer to the dt_update_request - * \param[in] th pointer to the transaction handler - * \param[out] sent whether the RPC has been sent + * \param[in] our pointer to the osp_update_request * * \retval 0 for success * \retval negative error number on failure */ -static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, - struct dt_update_request *dt_update, - struct thandle *th, int *sent) +static int osp_send_update_req(const struct lu_env *env, + struct osp_device *osp, + struct osp_update_request *our) { struct osp_update_args *args; struct ptlrpc_request *req; + struct lu_device *top_device; + struct osp_thandle *oth = our->our_th; int rc = 0; ENTRY; + LASSERT(oth != NULL); + LASSERT(our->our_req_sent == 0); rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, - dt_update->dur_buf.ub_req, &req); - if (rc != 0) + our->our_req, &req); + if (rc != 0) { + osp_trans_callback(env, oth, rc); RETURN(rc); + } - *sent = 1; - req->rq_interpret_reply = osp_update_interpret; args = ptlrpc_req_async_args(req); - args->oaua_update = dt_update; + args->oaua_update = our; + osp_thandle_get(oth); /* hold for update interpret */ + req->rq_interpret_reply = osp_update_interpret; + if (!oth->ot_super.th_wait_submit && !oth->ot_super.th_sync) { + if (!osp->opd_imp_active || !osp->opd_imp_connected) { + osp_trans_callback(env, oth, rc); + osp_thandle_put(oth); + GOTO(out, rc = -ENOTCONN); + } - if (is_only_remote_trans(th) && !th->th_sync && - !th->th_wait_submit) { + rc = obd_get_request_slot(&osp->opd_obd->u.cli); + if (rc != 0) { + osp_trans_callback(env, oth, rc); + osp_thandle_put(oth); + GOTO(out, rc = -ENOTCONN); + } args->oaua_flow_control = true; if (!osp->opd_connect_mdt) { @@ -804,9 +891,12 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, } ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); + req = NULL; } else { - struct osp_thandle *oth = thandle_to_osp_thandle(th); - struct lu_device *top_device; + osp_thandle_get(oth); /* hold for commit callback */ + req->rq_commit_cb = osp_request_commit_cb; + req->rq_cb_data = &oth->ot_super; + args->oaua_flow_control = false; /* If the transaction is created during MDT recoverying * process, it means this is an recovery update, we need @@ -818,19 +908,28 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, if (top_device->ld_obd->obd_recovering) req->rq_allow_replay = 1; - args->oaua_flow_control = false; - req->rq_commit_cb = osp_request_commit_cb; - req->rq_cb_data = th; - osp_thandle_get(oth); /* for commit callback */ osp_get_rpc_lock(osp); rc = ptlrpc_queue_wait(req); osp_put_rpc_lock(osp); - if (req->rq_transno == 0 && !req->rq_committed) + if ((rc == -ENOMEM && req->rq_set == NULL) || + (req->rq_transno == 0 && !req->rq_committed)) { + if (args->oaua_update != NULL) { + /* If osp_update_interpret is not being called, + * release the osp_thandle */ + args->oaua_update = NULL; + osp_thandle_put(oth); + } + + req->rq_cb_data = NULL; + rc = rc == 0 ? req->rq_status : rc; + osp_trans_callback(env, oth, rc); osp_thandle_put(oth); - else - oth->ot_dur = NULL; - ptlrpc_req_finished(req); + GOTO(out, rc); + } } +out: + if (req != NULL) + ptlrpc_req_finished(req); RETURN(rc); } @@ -885,6 +984,199 @@ struct thandle *osp_get_storage_thandle(const struct lu_env *env, } /** + * Set version for the transaction + * + * Set the version for the transaction, then the osp RPC will be + * sent in the order of version, i.e. the transaction with lower + * version will be sent first. + * + * \param [in] oth osp thandle to be set version. + * + * \retval 0 if set version succeeds + * negative errno if set version fails. + */ +int osp_check_and_set_rpc_version(struct osp_thandle *oth) +{ + struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev); + struct osp_updates *ou = osp->opd_update; + + if (ou == NULL) + return -EIO; + + if (oth->ot_version != 0) + return 0; + + spin_lock(&ou->ou_lock); + oth->ot_version = ou->ou_version++; + spin_unlock(&ou->ou_lock); + + CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n", + osp->opd_obd->obd_name, ou->ou_version, oth, oth->ot_version); + + return 0; +} + +/** + * Get next OSP update request in the sending list + * Get next OSP update request in the sending list by version number, next + * request will be + * 1. transaction which does not have a version number. + * 2. transaction whose version == opd_rpc_version. + * + * \param [in] ou osp update structure. + * \param [out] ourp the pointer holding the next update request. + * + * \retval true if getting the next transaction. + * \retval false if not getting the next transaction. + */ +static bool +osp_get_next_request(struct osp_updates *ou, struct osp_update_request **ourp) +{ + struct osp_update_request *our; + struct osp_update_request *tmp; + bool got_req = false; + + spin_lock(&ou->ou_lock); + list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) { + LASSERT(our->our_th != NULL); + CDEBUG(D_INFO, "our %p version "LPU64" rpc_version "LPU64"\n", + our, our->our_th->ot_version, ou->ou_rpc_version); + if (our->our_th->ot_version == 0) { + list_del_init(&our->our_list); + *ourp = our; + got_req = true; + break; + } + + /* Find next osp_update_request in the list */ + if (our->our_th->ot_version == ou->ou_rpc_version) { + list_del_init(&our->our_list); + *ourp = our; + got_req = true; + break; + } + } + spin_unlock(&ou->ou_lock); + + return got_req; +} + +static void osp_update_rpc_version(struct osp_updates *ou, + struct osp_thandle *oth) +{ + if (oth->ot_version == 0) + return; + + LASSERT(oth->ot_version == ou->ou_rpc_version); + spin_lock(&ou->ou_lock); + ou->ou_rpc_version++; + spin_unlock(&ou->ou_lock); +} + +/** + * Sending update thread + * + * Create thread to send update request to other MDTs, this thread will pull + * out update request from the list in OSP by version number, i.e. it will + * make sure the update request with lower version number will be sent first. + * + * \param[in] arg hold the OSP device. + * + * \retval 0 if the thread is created successfully. + * \retal negative error if the thread is not created + * successfully. + */ +int osp_send_update_thread(void *arg) +{ + struct lu_env env; + struct osp_device *osp = arg; + struct l_wait_info lwi = { 0 }; + struct osp_updates *ou = osp->opd_update; + struct ptlrpc_thread *thread = &osp->opd_update_thread; + struct osp_update_request *our = NULL; + int rc; + ENTRY; + + LASSERT(ou != NULL); + rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags); + if (rc < 0) { + CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name, + rc); + RETURN(rc); + } + + thread->t_flags = SVC_RUNNING; + wake_up(&thread->t_ctl_waitq); + while (1) { + our = NULL; + l_wait_event(ou->ou_waitq, + !osp_send_update_thread_running(osp) || + osp_get_next_request(ou, &our), + &lwi); + + if (!osp_send_update_thread_running(osp)) { + if (our != NULL && our->our_th != NULL) { + osp_trans_callback(&env, our->our_th, -EINTR); + osp_thandle_put(our->our_th); + } + break; + } + + if (our->our_req_sent == 0) { + if (our->our_th != NULL && + our->our_th->ot_super.th_result != 0) + osp_trans_callback(&env, our->our_th, + our->our_th->ot_super.th_result); + else + rc = osp_send_update_req(&env, osp, our); + } + + if (our->our_th != NULL) { + /* Update the rpc version */ + osp_update_rpc_version(ou, our->our_th); + /* Balanced for thandle_get in osp_trans_trigger() */ + osp_thandle_put(our->our_th); + } + } + + thread->t_flags = SVC_STOPPED; + lu_env_fini(&env); + wake_up(&thread->t_ctl_waitq); + + RETURN(0); +} + +/** + * Trigger the request for remote updates. + * + * Add the request to the sending list, and wake up osp update + * sending thread. + * + * \param[in] env pointer to the thread context + * \param[in] osp pointer to the OSP device + * \param[in] oth pointer to the transaction handler + * + */ +static void osp_trans_trigger(const struct lu_env *env, + struct osp_device *osp, + struct osp_thandle *oth) +{ + + CDEBUG(D_INFO, "%s: add oth %p with version "LPU64"\n", + osp->opd_obd->obd_name, oth, oth->ot_version); + + LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC); + osp_thandle_get(oth); + LASSERT(oth->ot_our != NULL); + spin_lock(&osp->opd_update->ou_lock); + list_add_tail(&oth->ot_our->our_list, + &osp->opd_update->ou_list); + spin_unlock(&osp->opd_update->ou_lock); + + wake_up(&osp->opd_update->ou_waitq); +} + +/** * The OSP layer dt_device_operations::dt_trans_start() interface * to start the transaction. * @@ -934,9 +1226,9 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, struct thandle *th) { struct osp_thandle *oth = thandle_to_osp_thandle(th); - struct dt_update_request *dt_update; + struct osp_update_request *our = oth->ot_our; + struct osp_device *osp = dt2osp_dev(dt); int rc = 0; - int sent = 0; ENTRY; /* For remote transaction, if there is local storage thandle, @@ -947,67 +1239,35 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, oth->ot_storage_th = NULL; } - dt_update = oth->ot_dur; - if (dt_update == NULL || th->th_result != 0) { - rc = th->th_result; - GOTO(out, rc); + if (our == NULL || our->our_req == NULL || + our->our_req->ourq_count == 0) { + osp_trans_callback(env, oth, th->th_result); + GOTO(out, rc = th->th_result); } - LASSERT(dt_update != LP_POISON); - - /* If there are no updates, destroy dt_update and thandle */ - if (dt_update->dur_buf.ub_req == NULL || - dt_update->dur_buf.ub_req->ourq_count == 0) + if (!osp->opd_connect_mdt) { + rc = osp_send_update_req(env, osp, oth->ot_our); GOTO(out, rc); - - if (is_only_remote_trans(th) && !th->th_sync && - !th->th_wait_submit) { - struct osp_device *osp = dt2osp_dev(th->th_dev); - struct client_obd *cli = &osp->opd_obd->u.cli; - - rc = obd_get_request_slot(cli); - if (rc != 0) - GOTO(out, rc); - - if (!osp->opd_imp_active || !osp->opd_imp_connected) { - obd_put_request_slot(cli); - GOTO(out, rc = -ENOTCONN); - } - - rc = osp_trans_trigger(env, dt2osp_dev(dt), - dt_update, th, &sent); - if (rc != 0) - obd_put_request_slot(cli); - } else { - rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, - th, &sent); } -out: - /* If RPC is triggered successfully, dt_update will be freed in - * osp_update_interpreter() */ - if (sent == 0) { - struct osp_update_callback *ouc; - struct osp_update_callback *next; + if (osp->opd_update == NULL || + !osp_send_update_thread_running(osp)) { + osp_trans_callback(env, oth, -EIO); + GOTO(out, rc = -EIO); + } - if (dt_update != NULL) { - list_for_each_entry_safe(ouc, next, - &dt_update->dur_cb_items, - ouc_list) { - list_del_init(&ouc->ouc_list); - if (ouc->ouc_interpreter != NULL) - ouc->ouc_interpreter(env, NULL, NULL, - ouc->ouc_obj, - ouc->ouc_data, 0, - rc); - osp_update_callback_fini(env, ouc); - } - } - osp_trans_commit_cb(oth, rc); - dt_update_request_destroy(dt_update); - oth->ot_dur = NULL; + if (th->th_sync) { + /* if th_sync is set, then it needs to be sent + * right away. Note: even thought the RPC has been + * sent, it still needs to be added to the sending + * list (see osp_trans_trigger()), so ou_rpc_version + * can be updated correctly. */ + rc = osp_send_update_req(env, osp, our); + our->our_req_sent = 1; } + osp_trans_trigger(env, osp, oth); +out: osp_thandle_put(oth); RETURN(rc); diff --git a/lustre/target/out_handler.c b/lustre/target/out_handler.c index 11dc864..9e63594 100644 --- a/lustre/target/out_handler.c +++ b/lustre/target/out_handler.c @@ -365,7 +365,7 @@ static int out_xattr_set(struct tgt_session_info *tsi) lbuf->lb_len = buf_len; tmp = object_update_param_get(update, 2, &size); - if (tmp == NULL || size != sizeof(*tmp)) { + if (tmp == NULL || IS_ERR(tmp) || size != sizeof(*tmp)) { CERROR("%s: emptry or wrong size %zu flag: rc = %d\n", tgt_name(tsi->tsi_tgt), size, -EPROTO); RETURN(err_serious(-EPROTO)); @@ -616,7 +616,7 @@ static int out_read(struct tgt_session_info *tsi) GOTO(out, rc = -ENOENT); tmp = object_update_param_get(update, 0, NULL); - if (tmp == NULL) { + if (tmp == NULL || IS_ERR(tmp)) { CERROR("%s: empty size for read: rc = %d\n", tgt_name(tsi->tsi_tgt), -EPROTO); GOTO(out, rc = err_serious(-EPROTO)); @@ -624,7 +624,7 @@ static int out_read(struct tgt_session_info *tsi) size = le64_to_cpu(*(size_t *)(tmp)); tmp = object_update_param_get(update, 1, NULL); - if (tmp == NULL) { + if (tmp == NULL || IS_ERR(tmp)) { CERROR("%s: empty pos for read: rc = %d\n", tgt_name(tsi->tsi_tgt), -EPROTO); GOTO(out, rc = err_serious(-EPROTO)); diff --git a/lustre/target/update_recovery.c b/lustre/target/update_recovery.c index 1b5ebca..2fd87d7b 100644 --- a/lustre/target/update_recovery.c +++ b/lustre/target/update_recovery.c @@ -302,7 +302,7 @@ again: } if (rc == -EEXIST) { - dtrq_destory(dtrq); + dtrq_destroy(dtrq); rc = 0; goto again; } @@ -342,7 +342,7 @@ EXPORT_SYMBOL(dtrq_list_dump); * * \param[in] dtrq distribute txn replqy req to be destroyed. */ -void dtrq_destory(struct distribute_txn_replay_req *dtrq) +void dtrq_destroy(struct distribute_txn_replay_req *dtrq) { struct distribute_txn_replay_req_sub *dtrqs; struct distribute_txn_replay_req_sub *tmp; @@ -360,7 +360,7 @@ void dtrq_destory(struct distribute_txn_replay_req *dtrq) OBD_FREE_PTR(dtrq); } -EXPORT_SYMBOL(dtrq_destory); +EXPORT_SYMBOL(dtrq_destroy); /** * Destroy all of replay req. @@ -378,7 +378,7 @@ void dtrq_list_destroy(struct target_distribute_txn_data *tdtd) list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list, dtrq_list) { list_del_init(&dtrq->dtrq_list); - dtrq_destory(dtrq); + dtrq_destroy(dtrq); } spin_unlock(&tdtd->tdtd_replay_list_lock); } @@ -1015,7 +1015,7 @@ int distribute_txn_replay_handle(struct lu_env *env, struct top_thandle *top_th; struct top_multiple_thandle *tmt; struct thandle_update_records *tur = NULL; - unsigned int i; + int i; int rc = 0; ENTRY; @@ -1098,7 +1098,7 @@ int distribute_txn_replay_handle(struct lu_env *env, CDEBUG(D_HA, "error during execution of #%u from" " %s:%d: rc = %d\n", i, ta->ta_args[i]->file, ta->ta_args[i]->line, rc); - while (--i >= 0) { + while (--i > 0) { if (ta->ta_args[i]->undo_fn != NULL) { dt_obj = ta->ta_args[i]->object; sub_dt = diff --git a/lustre/target/update_trans.c b/lustre/target/update_trans.c index e56c1ec..0d2d995 100644 --- a/lustre/target/update_trans.c +++ b/lustre/target/update_trans.c @@ -320,7 +320,10 @@ struct sub_thandle *create_sub_thandle(struct top_multiple_thandle *tmt, * Mark the sub thandle to be committed and if all sub thandle are committed * notify the top thandle. * - * \param[in] sub_th sub thandle being committed. + * \param[in] env execution environment + * \param[in] sub_th sub thandle being committed + * \param[in] cb commit callback + * \param[in] err trans result */ static void sub_trans_commit_cb(struct lu_env *env, struct thandle *sub_th, @@ -364,6 +367,48 @@ static void sub_thandle_register_commit_cb(struct sub_thandle *st, } /** + * Sub thandle stop call back + * + * After sub thandle is stopped, it will call this callback to notify + * the top thandle. + * + * \param[in] th sub thandle to be stopped + * \param[in] rc result of sub trans + */ +static void sub_trans_stop_cb(struct lu_env *env, + struct thandle *sub_th, + struct dt_txn_commit_cb *cb, int err) +{ + struct sub_thandle *st; + struct top_multiple_thandle *tmt = cb->dcb_data; + ENTRY; + + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (st->st_stopped) + continue; + + if (st->st_dt == sub_th->th_dev) { + st->st_stopped = 1; + st->st_result = err; + break; + } + } + + wake_up(&tmt->tmt_stop_waitq); + RETURN_EXIT; +} + +static void sub_thandle_register_stop_cb(struct sub_thandle *st, + struct top_multiple_thandle *tmt) +{ + st->st_stop_dcb.dcb_func = sub_trans_stop_cb; + st->st_stop_dcb.dcb_data = tmt; + st->st_stop_dcb.dcb_flags = DCB_TRANS_STOP; + INIT_LIST_HEAD(&st->st_stop_dcb.dcb_linkage); + dt_trans_cb_add(st->st_sub_th, &st->st_stop_dcb); +} + +/** * Create sub thandle * * Create transaction handle for sub_thandle @@ -392,7 +437,6 @@ int sub_thandle_trans_create(const struct lu_env *env, return 0; } - /** * Create the top transaction. * @@ -611,6 +655,7 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, if (rc != 0) GOTO(out, rc); + sub_thandle_register_stop_cb(st, tmt); sub_thandle_register_commit_cb(st, tmt); } out: @@ -659,6 +704,57 @@ static bool top_check_write_updates(struct top_thandle *top_th) } /** + * Check if top transaction is stopped + * + * Check if top transaction is stopped, only if all sub transaction + * is stopped, then the top transaction is stopped. + * + * \param [in] top_th top thandle + * + * \retval true if the top transaction is stopped. + * \retval false if the top transaction is not stopped. + */ +static bool top_trans_is_stopped(struct top_thandle *top_th) +{ + struct top_multiple_thandle *tmt; + struct sub_thandle *st; + bool all_stopped = true; + + tmt = top_th->tt_multiple_thandle; + list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { + if (!st->st_stopped && st->st_sub_th != NULL) { + all_stopped = false; + break; + } + + if (st->st_result != 0 && + top_th->tt_super.th_result == 0) + top_th->tt_super.th_result = st->st_result; + } + + return all_stopped; +} + +/** + * Wait result of top transaction + * + * Wait until all sub transaction get its result. + * + * \param [in] top_th top thandle. + * + * \retval the result of top thandle. + */ +static int top_trans_wait_result(struct top_thandle *top_th) +{ + struct l_wait_info lwi = {0}; + + l_wait_event(top_th->tt_multiple_thandle->tmt_stop_waitq, + top_trans_is_stopped(top_th), &lwi); + + RETURN(top_th->tt_super.th_result); +} + +/** * Stop the top transaction. * * Stop the transaction on the master device first, then stop transactions @@ -806,7 +902,10 @@ stop_other_trans: th->th_result = rc; } + rc = top_trans_wait_result(top_th); + tmt->tmt_result = rc; + /* Balance for the refcount in top_trans_create, Note: if it is NOT * multiple node transaction, the top transaction will be destroyed. */ top_multiple_thandle_put(tmt); @@ -842,6 +941,7 @@ int top_trans_create_tmt(const struct lu_env *env, INIT_LIST_HEAD(&tmt->tmt_commit_list); atomic_set(&tmt->tmt_refcount, 1); + init_waitqueue_head(&tmt->tmt_stop_waitq); top_th->tt_multiple_thandle = tmt; return 0; @@ -860,6 +960,7 @@ create_sub_thandle_with_thandle(struct top_thandle *top_th, return st; st->st_sub_th = sub_th; + sub_th->th_top = &top_th->tt_super; return st; } @@ -1135,7 +1236,7 @@ distribute_txn_commit_batchid_update(const struct lu_env *env, if (rc < 0) GOTO(stop, rc); - dt_trans_cb_add(th, &dtbd->dtbd_cb); + rc = dt_trans_cb_add(th, &dtbd->dtbd_cb); if (rc < 0) GOTO(stop, rc); @@ -1188,8 +1289,11 @@ distribute_txn_commit_batchid_init(const struct lu_env *env, dt_obj = dt_find_or_create(env, lut->lut_bottom, fid, dof, attr); - if (IS_ERR(dt_obj)) - GOTO(out_put, rc = PTR_ERR(dt_obj)); + if (IS_ERR(dt_obj)) { + rc = PTR_ERR(dt_obj); + dt_obj = NULL; + GOTO(out_put, rc); + } tdtd->tdtd_batchid_obj = dt_obj; -- 1.8.3.1